From 8185bb53476378443240d57f7d844347d5fae1bf Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Fri, 6 Mar 2026 08:27:56 -0800 Subject: [PATCH] CREATE SUBSCRIPTION ... SERVER. Allow CREATE SUBSCRIPTION to accept a foreign server using the SERVER clause instead of a raw connection string using the CONNECTION clause. * Enables a user with sufficient privileges to create a subscription using a foreign server by name without specifying the connection details. * Integrates with user mappings (and other FDW infrastructure) using the subscription owner. * Provides a layer of indirection to manage multiple subscriptions to the same remote server more easily. Also add CREATE FOREIGN DATA WRAPPER ... CONNECTION clause to specify a connection_function. To be eligible for a subscription, the foreign server's foreign data wrapper must specify a connection_function. Add connection_function support to postgres_fdw, and bump postgres_fdw version to 1.3. Bump catversion. Reviewed-by: Ashutosh Bapat Reviewed-by: Shlok Kyal Reviewed-by: Masahiko Sawada Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/61831790a0a937038f78ce09f8dd4cef7de7456a.camel@j-davis.com --- contrib/postgres_fdw/Makefile | 2 +- contrib/postgres_fdw/connection.c | 309 +++++++++++------- .../postgres_fdw/expected/postgres_fdw.out | 8 + contrib/postgres_fdw/meson.build | 2 + .../postgres_fdw/postgres_fdw--1.2--1.3.sql | 12 + contrib/postgres_fdw/postgres_fdw.control | 2 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 7 + contrib/postgres_fdw/t/010_subscription.pl | 71 ++++ doc/src/sgml/logical-replication.sgml | 4 +- doc/src/sgml/postgres-fdw.sgml | 26 ++ .../sgml/ref/alter_foreign_data_wrapper.sgml | 20 ++ doc/src/sgml/ref/alter_subscription.sgml | 18 +- .../sgml/ref/create_foreign_data_wrapper.sgml | 20 ++ doc/src/sgml/ref/create_server.sgml | 7 + doc/src/sgml/ref/create_subscription.sgml | 16 +- src/backend/catalog/dependency.c | 11 + src/backend/catalog/pg_subscription.c | 38 ++- src/backend/catalog/system_views.sql | 2 +- src/backend/commands/foreigncmds.c | 58 +++- src/backend/commands/subscriptioncmds.c | 203 +++++++++++- src/backend/foreign/foreign.c | 86 +++++ src/backend/parser/gram.y | 22 ++ src/backend/replication/logical/worker.c | 24 +- src/bin/pg_dump/pg_dump.c | 39 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 6 +- src/bin/psql/tab-complete.in.c | 11 +- src/include/catalog/catversion.h | 2 +- src/include/catalog/pg_foreign_data_wrapper.h | 3 + src/include/catalog/pg_subscription.h | 8 +- src/include/foreign/foreign.h | 3 + src/include/nodes/parsenodes.h | 3 + src/test/regress/expected/oidjoins.out | 2 + src/test/regress/expected/subscription.out | 222 ++++++++----- src/test/regress/regress.c | 7 + src/test/regress/sql/subscription.sql | 55 ++++ 36 files changed, 1075 insertions(+), 255 deletions(-) create mode 100644 contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql create mode 100644 contrib/postgres_fdw/t/010_subscription.pl diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index 8eaf4d263b6..b8c78b58804 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir) SHLIB_LINK_INTERNAL = $(libpq) EXTENSION = postgres_fdw -DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql +DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql postgres_fdw--1.2--1.3.sql REGRESS = postgres_fdw query_cancel ISOLATION = eval_plan_qual diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 311936406f2..7e2b822d161 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -132,6 +132,7 @@ PG_FUNCTION_INFO_V1(postgres_fdw_get_connections); PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect); PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all); +PG_FUNCTION_INFO_V1(postgres_fdw_connection); /* prototypes of private functions */ static void make_new_connection(ConnCacheEntry *entry, UserMapping *user); @@ -476,6 +477,142 @@ pgfdw_security_check(const char **keywords, const char **values, UserMapping *us errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes."))); } +/* + * Construct connection params from generic options of ForeignServer and + * UserMapping. (Some of them might not be libpq options, in which case we'll + * just waste a few array slots.) + */ +static void +construct_connection_params(ForeignServer *server, UserMapping *user, + const char ***p_keywords, const char ***p_values, + char **p_appname) +{ + const char **keywords; + const char **values; + char *appname = NULL; + int n; + + /* + * Add 4 extra slots for application_name, fallback_application_name, + * client_encoding, end marker, and 3 extra slots for scram keys and + * required scram pass-through options. + */ + n = list_length(server->options) + list_length(user->options) + 4 + 3; + keywords = (const char **) palloc(n * sizeof(char *)); + values = (const char **) palloc(n * sizeof(char *)); + + n = 0; + n += ExtractConnectionOptions(server->options, + keywords + n, values + n); + n += ExtractConnectionOptions(user->options, + keywords + n, values + n); + + /* + * Use pgfdw_application_name as application_name if set. + * + * PQconnectdbParams() processes the parameter arrays from start to end. + * If any key word is repeated, the last value is used. Therefore note + * that pgfdw_application_name must be added to the arrays after options + * of ForeignServer are, so that it can override application_name set in + * ForeignServer. + */ + if (pgfdw_application_name && *pgfdw_application_name != '\0') + { + keywords[n] = "application_name"; + values[n] = pgfdw_application_name; + n++; + } + + /* + * Search the parameter arrays to find application_name setting, and + * replace escape sequences in it with status information if found. The + * arrays are searched backwards because the last value is used if + * application_name is repeatedly set. + */ + for (int i = n - 1; i >= 0; i--) + { + if (strcmp(keywords[i], "application_name") == 0 && + *(values[i]) != '\0') + { + /* + * Use this application_name setting if it's not empty string even + * after any escape sequences in it are replaced. + */ + appname = process_pgfdw_appname(values[i]); + if (appname[0] != '\0') + { + values[i] = appname; + break; + } + + /* + * This empty application_name is not used, so we set values[i] to + * NULL and keep searching the array to find the next one. + */ + values[i] = NULL; + pfree(appname); + appname = NULL; + } + } + + *p_appname = appname; + + /* Use "postgres_fdw" as fallback_application_name */ + keywords[n] = "fallback_application_name"; + values[n] = "postgres_fdw"; + n++; + + /* Set client_encoding so that libpq can convert encoding properly. */ + keywords[n] = "client_encoding"; + values[n] = GetDatabaseEncodingName(); + n++; + + /* Add required SCRAM pass-through connection options if it's enabled. */ + if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user)) + { + int len; + int encoded_len; + + keywords[n] = "scram_client_key"; + len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey)); + /* don't forget the zero-terminator */ + values[n] = palloc0(len + 1); + encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey, + sizeof(MyProcPort->scram_ClientKey), + (char *) values[n], len); + if (encoded_len < 0) + elog(ERROR, "could not encode SCRAM client key"); + n++; + + keywords[n] = "scram_server_key"; + len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey)); + /* don't forget the zero-terminator */ + values[n] = palloc0(len + 1); + encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey, + sizeof(MyProcPort->scram_ServerKey), + (char *) values[n], len); + if (encoded_len < 0) + elog(ERROR, "could not encode SCRAM server key"); + n++; + + /* + * Require scram-sha-256 to ensure that no other auth method is used + * when connecting with foreign server. + */ + keywords[n] = "require_auth"; + values[n] = "scram-sha-256"; + n++; + } + + keywords[n] = values[n] = NULL; + + /* Verify the set of connection parameters. */ + check_conn_params(keywords, values, user); + + *p_keywords = keywords; + *p_values = values; +} + /* * Connect to remote server using specified server and user mapping properties. */ @@ -491,127 +628,9 @@ connect_pg_server(ForeignServer *server, UserMapping *user) { const char **keywords; const char **values; - char *appname = NULL; - int n; + char *appname; - /* - * Construct connection params from generic options of ForeignServer - * and UserMapping. (Some of them might not be libpq options, in - * which case we'll just waste a few array slots.) Add 4 extra slots - * for application_name, fallback_application_name, client_encoding, - * end marker, and 3 extra slots for scram keys and required scram - * pass-through options. - */ - n = list_length(server->options) + list_length(user->options) + 4 + 3; - keywords = (const char **) palloc(n * sizeof(char *)); - values = (const char **) palloc(n * sizeof(char *)); - - n = 0; - n += ExtractConnectionOptions(server->options, - keywords + n, values + n); - n += ExtractConnectionOptions(user->options, - keywords + n, values + n); - - /* - * Use pgfdw_application_name as application_name if set. - * - * PQconnectdbParams() processes the parameter arrays from start to - * end. If any key word is repeated, the last value is used. Therefore - * note that pgfdw_application_name must be added to the arrays after - * options of ForeignServer are, so that it can override - * application_name set in ForeignServer. - */ - if (pgfdw_application_name && *pgfdw_application_name != '\0') - { - keywords[n] = "application_name"; - values[n] = pgfdw_application_name; - n++; - } - - /* - * Search the parameter arrays to find application_name setting, and - * replace escape sequences in it with status information if found. - * The arrays are searched backwards because the last value is used if - * application_name is repeatedly set. - */ - for (int i = n - 1; i >= 0; i--) - { - if (strcmp(keywords[i], "application_name") == 0 && - *(values[i]) != '\0') - { - /* - * Use this application_name setting if it's not empty string - * even after any escape sequences in it are replaced. - */ - appname = process_pgfdw_appname(values[i]); - if (appname[0] != '\0') - { - values[i] = appname; - break; - } - - /* - * This empty application_name is not used, so we set - * values[i] to NULL and keep searching the array to find the - * next one. - */ - values[i] = NULL; - pfree(appname); - appname = NULL; - } - } - - /* Use "postgres_fdw" as fallback_application_name */ - keywords[n] = "fallback_application_name"; - values[n] = "postgres_fdw"; - n++; - - /* Set client_encoding so that libpq can convert encoding properly. */ - keywords[n] = "client_encoding"; - values[n] = GetDatabaseEncodingName(); - n++; - - /* Add required SCRAM pass-through connection options if it's enabled. */ - if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user)) - { - int len; - int encoded_len; - - keywords[n] = "scram_client_key"; - len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey)); - /* don't forget the zero-terminator */ - values[n] = palloc0(len + 1); - encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey, - sizeof(MyProcPort->scram_ClientKey), - (char *) values[n], len); - if (encoded_len < 0) - elog(ERROR, "could not encode SCRAM client key"); - n++; - - keywords[n] = "scram_server_key"; - len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey)); - /* don't forget the zero-terminator */ - values[n] = palloc0(len + 1); - encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey, - sizeof(MyProcPort->scram_ServerKey), - (char *) values[n], len); - if (encoded_len < 0) - elog(ERROR, "could not encode SCRAM server key"); - n++; - - /* - * Require scram-sha-256 to ensure that no other auth method is - * used when connecting with foreign server. - */ - keywords[n] = "require_auth"; - values[n] = "scram-sha-256"; - n++; - } - - keywords[n] = values[n] = NULL; - - /* Verify the set of connection parameters. */ - check_conn_params(keywords, values, user); + construct_connection_params(server, user, &keywords, &values, &appname); /* first time, allocate or get the custom wait event */ if (pgfdw_we_connect == 0) @@ -2310,6 +2329,56 @@ postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo, } } +/* + * Values in connection strings must be enclosed in single quotes. Single + * quotes and backslashes must be escaped with backslash. NB: these rules are + * different from the rules for escaping a SQL literal. + */ +static void +appendEscapedValue(StringInfo str, const char *val) +{ + appendStringInfoChar(str, '\''); + for (int i = 0; val[i] != '\0'; i++) + { + if (val[i] == '\\' || val[i] == '\'') + appendStringInfoChar(str, '\\'); + appendStringInfoChar(str, val[i]); + } + appendStringInfoChar(str, '\''); +} + +Datum +postgres_fdw_connection(PG_FUNCTION_ARGS) +{ + Oid userid = PG_GETARG_OID(0); + Oid serverid = PG_GETARG_OID(1); + ForeignServer *server = GetForeignServer(serverid); + UserMapping *user = GetUserMapping(userid, serverid); + StringInfoData str; + const char **keywords; + const char **values; + char *appname; + char *sep = ""; + + construct_connection_params(server, user, &keywords, &values, &appname); + + initStringInfo(&str); + for (int i = 0; keywords[i] != NULL; i++) + { + if (values[i] == NULL) + continue; + appendStringInfo(&str, "%s%s = ", sep, keywords[i]); + appendEscapedValue(&str, values[i]); + sep = " "; + } + + if (appname != NULL) + pfree(appname); + pfree(keywords); + pfree(values); + PG_RETURN_TEXT_P(cstring_to_text(str.data)); +} + /* * List active foreign server connections. * diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 2ccb72c539a..0f5271d476e 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -255,6 +255,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again ANALYZE ft1; ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true'); -- =================================================================== +-- test subscription +-- =================================================================== +CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1 + PUBLICATION pub1 WITH (slot_name = NONE, connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +DROP SUBSCRIPTION regress_pgfdw_subscription; +-- =================================================================== -- test error case for create publication on foreign table -- =================================================================== CREATE PUBLICATION testpub_ftbl FOR TABLE ft1; -- should fail diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build index ea4cd9fcd46..3e2ed06b766 100644 --- a/contrib/postgres_fdw/meson.build +++ b/contrib/postgres_fdw/meson.build @@ -27,6 +27,7 @@ install_data( 'postgres_fdw--1.0.sql', 'postgres_fdw--1.0--1.1.sql', 'postgres_fdw--1.1--1.2.sql', + 'postgres_fdw--1.2--1.3.sql', kwargs: contrib_data_args, ) @@ -50,6 +51,7 @@ tests += { 'tap': { 'tests': [ 't/001_auth_scram.pl', + 't/010_subscription.pl', ], }, } diff --git a/contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql b/contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql new file mode 100644 index 00000000000..5bcf0ba2e09 --- /dev/null +++ b/contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql @@ -0,0 +1,12 @@ +/* contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.3'" to load this file. \quit + +-- takes internal parameter to prevent calling from SQL +CREATE FUNCTION postgres_fdw_connection(oid, oid, internal) +RETURNS text +AS 'MODULE_PATHNAME' +LANGUAGE C STRICT PARALLEL RESTRICTED; + +ALTER FOREIGN DATA WRAPPER postgres_fdw CONNECTION postgres_fdw_connection; diff --git a/contrib/postgres_fdw/postgres_fdw.control b/contrib/postgres_fdw/postgres_fdw.control index a4b800be4fc..ae2963d480d 100644 --- a/contrib/postgres_fdw/postgres_fdw.control +++ b/contrib/postgres_fdw/postgres_fdw.control @@ -1,5 +1,5 @@ # postgres_fdw extension comment = 'foreign-data wrapper for remote PostgreSQL servers' -default_version = '1.2' +default_version = '1.3' module_pathname = '$libdir/postgres_fdw' relocatable = true diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 72d2d9c311b..49ed797e8ef 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -244,6 +244,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again ANALYZE ft1; ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true'); +-- =================================================================== +-- test subscription +-- =================================================================== +CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1 + PUBLICATION pub1 WITH (slot_name = NONE, connect = false); +DROP SUBSCRIPTION regress_pgfdw_subscription; + -- =================================================================== -- test error case for create publication on foreign table -- =================================================================== diff --git a/contrib/postgres_fdw/t/010_subscription.pl b/contrib/postgres_fdw/t/010_subscription.pl new file mode 100644 index 00000000000..1e41091badc --- /dev/null +++ b/contrib/postgres_fdw/t/010_subscription.pl @@ -0,0 +1,71 @@ + +# Copyright (c) 2021-2026, PostgreSQL Global Development Group + +# Basic logical replication test +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a"); + +# Replicate the changes without columns +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_no_col default VALUES"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins"); + +my $publisher_host = $node_publisher->host; +my $publisher_port = $node_publisher->port; +$node_subscriber->safe_psql('postgres', + "CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')" +); + +$node_subscriber->safe_psql('postgres', + "CREATE USER MAPPING FOR PUBLIC SERVER tap_server" +); + +$node_subscriber->safe_psql('postgres', + "CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')" +); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match"); +is($result, qq(1002), 'check that initial data was copied to subscriber'); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a"); + +$node_publisher->wait_for_catchup('tap_sub'); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match"); +is($result, qq(1050), 'check that inserted data was copied to subscriber'); + +done_testing(); diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index bcb473c078b..72c8d3d59bd 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -2577,7 +2577,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER To create a subscription, the user must have the privileges of the pg_create_subscription role, as well as - CREATE privileges on the database. + CREATE privileges on the database. If + SERVER is specified, the user also must have + USAGE privileges on the server. diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index fcf10e4317e..de69ddcdebc 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -1049,6 +1049,32 @@ postgres=# SELECT postgres_fdw_disconnect_all(); + + Subscription Management + + + postgres_fdw supports subscription connections using + the same options described in . + + + + For example, assuming the remote server foreign-host has + a publication testpub: + +CREATE SERVER subscription_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'foreign-host', dbname 'foreign_db'); +CREATE USER MAPPING FOR local_user SERVER subscription_server OPTIONS (user 'foreign_user', password 'password'); +CREATE SUBSCRIPTION my_subscription SERVER subscription_server PUBLICATION testpub; + + + + + To create a subscription, the user must be a member of the role and have + USAGE privileges on the server. + + + Transaction Management diff --git a/doc/src/sgml/ref/alter_foreign_data_wrapper.sgml b/doc/src/sgml/ref/alter_foreign_data_wrapper.sgml index dc0957d965a..640c02893cf 100644 --- a/doc/src/sgml/ref/alter_foreign_data_wrapper.sgml +++ b/doc/src/sgml/ref/alter_foreign_data_wrapper.sgml @@ -24,6 +24,7 @@ PostgreSQL documentation ALTER FOREIGN DATA WRAPPER name [ HANDLER handler_function | NO HANDLER ] [ VALIDATOR validator_function | NO VALIDATOR ] + [ CONNECTION connection_function | NO CONNECTION ] [ OPTIONS ( [ ADD | SET | DROP ] option ['value'] [, ... ]) ] ALTER FOREIGN DATA WRAPPER name OWNER TO { new_owner | CURRENT_ROLE | CURRENT_USER | SESSION_USER } ALTER FOREIGN DATA WRAPPER name RENAME TO new_name @@ -112,6 +113,25 @@ ALTER FOREIGN DATA WRAPPER name REN + + CONNECTION connection_function + + + Specifies a new connection function for the foreign-data wrapper. + + + + + + NO CONNECTION + + + This is used to specify that the foreign-data wrapper should no + longer have a connection function. + + + + OPTIONS ( [ ADD | SET | DROP ] option ['value'] [, ... ] ) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 5318998e80c..f215fb0e5a2 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -21,6 +21,7 @@ PostgreSQL documentation +ALTER SUBSCRIPTION name SERVER servername ALTER SUBSCRIPTION name CONNECTION 'conninfo' ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ] ALTER SUBSCRIPTION name ADD PUBLICATION publication_name [, ...] [ WITH ( publication_option [= value] [, ... ] ) ] @@ -102,13 +103,24 @@ ALTER SUBSCRIPTION name RENAME TO < + + SERVER servername + + + This clause replaces the foreign server or connection string originally + set by with the foreign server + servername. + + + + CONNECTION 'conninfo' - This clause replaces the connection string originally set by - . See there for more - information. + This clause replaces the foreign server or connection string originally + set by with the connection + string conninfo. diff --git a/doc/src/sgml/ref/create_foreign_data_wrapper.sgml b/doc/src/sgml/ref/create_foreign_data_wrapper.sgml index 0fcba18a347..7b83f500b25 100644 --- a/doc/src/sgml/ref/create_foreign_data_wrapper.sgml +++ b/doc/src/sgml/ref/create_foreign_data_wrapper.sgml @@ -24,6 +24,7 @@ PostgreSQL documentation CREATE FOREIGN DATA WRAPPER name [ HANDLER handler_function | NO HANDLER ] [ VALIDATOR validator_function | NO VALIDATOR ] + [ CONNECTION connection_function | NO CONNECTION ] [ OPTIONS ( option 'value' [, ... ] ) ] @@ -99,6 +100,25 @@ CREATE FOREIGN DATA WRAPPER name + + CONNECTION connection_function + + + connection_function is the + name of a previously registered function that will be called to generate + the postgres connection string when a foreign server is used as part of + . If no connection function or + NO CONNECTION is specified, then servers using this + foreign data wrapper cannot be used for CREATE + SUBSCRIPTION. The connection function must take three + arguments: one of type oid for the user, one of type + oid for the server, and an unused third argument of type + internal (which prevents calling the function in other + contexts). + + + + OPTIONS ( option 'value' [, ... ] ) diff --git a/doc/src/sgml/ref/create_server.sgml b/doc/src/sgml/ref/create_server.sgml index 05f4019453b..ce4a064eabb 100644 --- a/doc/src/sgml/ref/create_server.sgml +++ b/doc/src/sgml/ref/create_server.sgml @@ -42,6 +42,13 @@ CREATE SERVER [ IF NOT EXISTS ] server_name + + If the foreign data wrapper fdw_name is + specified with a CONNECTION clause, then may use this foreign server for + connection information. + + The server name must be unique within the database. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index eb0cc645d8f..07d5b1bd77c 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -22,7 +22,7 @@ PostgreSQL documentation CREATE SUBSCRIPTION subscription_name - CONNECTION 'conninfo' + { SERVER servername | CONNECTION 'conninfo' } PUBLICATION publication_name [, ...] [ WITH ( subscription_parameter [= value] [, ... ] ) ] @@ -77,6 +77,20 @@ CREATE SUBSCRIPTION subscription_name + + SERVER servername + + + A foreign server to use for the connection. The server's foreign data + wrapper must have a connection_function + registered, and a user mapping for the subscription owner on the server + must exist. Additionally, the subscription owner must have + USAGE privileges on + servername. + + + + CONNECTION 'conninfo' diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 570c434ede8..09575278de3 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -895,6 +895,17 @@ findDependentObjects(const ObjectAddress *object, object->objectSubId == 0) continue; + /* + * Check that the dependent object is not in a shared catalog, which + * is not supported by doDeletion(). + */ + if (IsSharedRelation(otherObject.classId)) + ereport(ERROR, + (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST), + errmsg("cannot drop %s because %s depends on it", + getObjectDescription(object, false), + getObjectDescription(&otherObject, false)))); + /* * Must lock the dependent object before recursing to it. */ diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index acf42b853ed..3673d4f0bc1 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -19,11 +19,14 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "catalog/indexing.h" +#include "catalog/pg_foreign_server.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" +#include "foreign/foreign.h" #include "miscadmin.h" #include "storage/lmgr.h" +#include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -69,7 +72,7 @@ GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal) * Fetch the subscription from the syscache. */ Subscription * -GetSubscription(Oid subid, bool missing_ok) +GetSubscription(Oid subid, bool missing_ok, bool aclcheck) { HeapTuple tup; Subscription *sub; @@ -108,10 +111,35 @@ GetSubscription(Oid subid, bool missing_ok) sub->retentionactive = subform->subretentionactive; /* Get conninfo */ - datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, - tup, - Anum_pg_subscription_subconninfo); - sub->conninfo = TextDatumGetCString(datum); + if (OidIsValid(subform->subserver)) + { + AclResult aclresult; + + /* recheck ACL if requested */ + if (aclcheck) + { + aclresult = object_aclcheck(ForeignServerRelationId, + subform->subserver, + subform->subowner, ACL_USAGE); + + if (aclresult != ACLCHECK_OK) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"", + GetUserNameFromId(subform->subowner, false), + ForeignServerName(subform->subserver)))); + } + + sub->conninfo = ForeignServerConnectionString(subform->subowner, + subform->subserver); + } + else + { + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_subconninfo); + sub->conninfo = TextDatumGetCString(datum); + } /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 2eda7d80d02..ecb7c996e86 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1449,7 +1449,7 @@ GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, subfailover, subretaindeadtuples, submaxretention, subretentionactive, - subslotname, subsynccommit, subpublications, suborigin) + subserver, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c index b56d1ad6785..45681235782 100644 --- a/src/backend/commands/foreigncmds.c +++ b/src/backend/commands/foreigncmds.c @@ -522,21 +522,53 @@ lookup_fdw_validator_func(DefElem *validator) /* validator's return value is ignored, so we don't check the type */ } +/* + * Convert a connection string function name passed from the parser to an Oid. + */ +static Oid +lookup_fdw_connection_func(DefElem *connection) +{ + Oid connectionOid; + Oid funcargtypes[3]; + + if (connection == NULL || connection->arg == NULL) + return InvalidOid; + + /* connection string functions take user oid, server oid */ + funcargtypes[0] = OIDOID; + funcargtypes[1] = OIDOID; + funcargtypes[2] = INTERNALOID; + + connectionOid = LookupFuncName((List *) connection->arg, 3, funcargtypes, false); + + /* check that connection string function has correct return type */ + if (get_func_rettype(connectionOid) != TEXTOID) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("function %s must return type %s", + NameListToString((List *) connection->arg), "text"))); + + return connectionOid; +} + /* * Process function options of CREATE/ALTER FDW */ static void parse_func_options(ParseState *pstate, List *func_options, bool *handler_given, Oid *fdwhandler, - bool *validator_given, Oid *fdwvalidator) + bool *validator_given, Oid *fdwvalidator, + bool *connection_given, Oid *fdwconnection) { ListCell *cell; *handler_given = false; *validator_given = false; + *connection_given = false; /* return InvalidOid if not given */ *fdwhandler = InvalidOid; *fdwvalidator = InvalidOid; + *fdwconnection = InvalidOid; foreach(cell, func_options) { @@ -556,6 +588,13 @@ parse_func_options(ParseState *pstate, List *func_options, *validator_given = true; *fdwvalidator = lookup_fdw_validator_func(def); } + else if (strcmp(def->defname, "connection") == 0) + { + if (*connection_given) + errorConflictingDefElem(def, pstate); + *connection_given = true; + *fdwconnection = lookup_fdw_connection_func(def); + } else elog(ERROR, "option \"%s\" not recognized", def->defname); @@ -575,8 +614,10 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt) Oid fdwId; bool handler_given; bool validator_given; + bool connection_given; Oid fdwhandler; Oid fdwvalidator; + Oid fdwconnection; Datum fdwoptions; Oid ownerId; ObjectAddress myself; @@ -620,10 +661,12 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt) /* Lookup handler and validator functions, if given */ parse_func_options(pstate, stmt->func_options, &handler_given, &fdwhandler, - &validator_given, &fdwvalidator); + &validator_given, &fdwvalidator, + &connection_given, &fdwconnection); values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler); values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator); + values[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection); nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true; @@ -695,8 +738,10 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt) Datum datum; bool handler_given; bool validator_given; + bool connection_given; Oid fdwhandler; Oid fdwvalidator; + Oid fdwconnection; ObjectAddress myself; rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock); @@ -726,7 +771,8 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt) parse_func_options(pstate, stmt->func_options, &handler_given, &fdwhandler, - &validator_given, &fdwvalidator); + &validator_given, &fdwvalidator, + &connection_given, &fdwconnection); if (handler_given) { @@ -764,6 +810,12 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt) fdwvalidator = fdwForm->fdwvalidator; } + if (connection_given) + { + repl_val[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection); + repl_repl[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = true; + } + /* * If options specified, validate and update. */ diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5e3c0964d38..9e21d7a7df9 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -27,13 +27,16 @@ #include "catalog/objectaddress.h" #include "catalog/pg_authid_d.h" #include "catalog/pg_database_d.h" +#include "catalog/pg_foreign_server.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" +#include "catalog/pg_user_mapping.h" #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/subscriptioncmds.h" #include "executor/executor.h" +#include "foreign/foreign.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" @@ -619,6 +622,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, Datum values[Natts_pg_subscription]; Oid owner = GetUserId(); HeapTuple tup; + Oid serverid; char *conninfo; char originname[NAMEDATALEN]; List *publications; @@ -730,15 +734,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.wal_receiver_timeout == NULL) opts.wal_receiver_timeout = "-1"; - conninfo = stmt->conninfo; - publications = stmt->publication; - /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); + if (stmt->servername) + { + ForeignServer *server; + + Assert(!stmt->conninfo); + conninfo = NULL; + + server = GetForeignServerByName(stmt->servername, false); + aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername); + + /* make sure a user mapping exists */ + GetUserMapping(owner, server->serverid); + + serverid = server->serverid; + conninfo = ForeignServerConnectionString(owner, serverid); + } + else + { + Assert(stmt->conninfo); + + serverid = InvalidOid; + conninfo = stmt->conninfo; + } + /* Check the connection info string. */ walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser()); + publications = stmt->publication; + /* Everything ok, form a new tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -768,8 +797,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, Int32GetDatum(opts.maxretention); values[Anum_pg_subscription_subretentionactive - 1] = Int32GetDatum(opts.retaindeadtuples); - values[Anum_pg_subscription_subconninfo - 1] = - CStringGetTextDatum(conninfo); + values[Anum_pg_subscription_subserver - 1] = serverid; + if (!OidIsValid(serverid)) + values[Anum_pg_subscription_subconninfo - 1] = + CStringGetTextDatum(conninfo); + else + nulls[Anum_pg_subscription_subconninfo - 1] = true; if (opts.slot_name) values[Anum_pg_subscription_subslotname - 1] = DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name)); @@ -792,6 +825,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, recordDependencyOnOwner(SubscriptionRelationId, subid, owner); + ObjectAddressSet(myself, SubscriptionRelationId, subid); + + if (stmt->servername) + { + ObjectAddress referenced; + + Assert(OidIsValid(serverid)); + + ObjectAddressSet(referenced, ForeignServerRelationId, serverid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + } + /* * A replication origin is currently created for all subscriptions, * including those that only contain sequences or are otherwise empty. @@ -945,8 +990,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.enabled || opts.retaindeadtuples) ApplyLauncherWakeupAtCommit(); - ObjectAddressSet(myself, SubscriptionRelationId, subid); - InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0); return myself; @@ -1410,7 +1453,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION, stmt->subname); - sub = GetSubscription(subid, false); + /* + * Skip ACL checks on the subscription's foreign server, if any. If + * changing the server (or replacing it with a raw connection), then the + * old one will be removed anyway. If changing something unrelated, + * there's no need to do an additional ACL check here; that will be done + * by the subscription worker anyway. + */ + sub = GetSubscription(subid, false, false); retain_dead_tuples = sub->retaindeadtuples; origin = sub->origin; @@ -1435,6 +1485,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); + ObjectAddressSet(myself, SubscriptionRelationId, subid); + switch (stmt->kind) { case ALTER_SUBSCRIPTION_OPTIONS: @@ -1753,7 +1805,78 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, break; } + case ALTER_SUBSCRIPTION_SERVER: + { + ForeignServer *new_server; + ObjectAddress referenced; + AclResult aclresult; + char *conninfo; + + /* + * Remove what was there before, either another foreign server + * or a connection string. + */ + if (form->subserver) + { + deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid, + DEPENDENCY_NORMAL, + ForeignServerRelationId, form->subserver); + } + else + { + nulls[Anum_pg_subscription_subconninfo - 1] = true; + replaces[Anum_pg_subscription_subconninfo - 1] = true; + } + + /* + * Check that the subscription owner has USAGE privileges on + * the server. + */ + new_server = GetForeignServerByName(stmt->servername, false); + aclresult = object_aclcheck(ForeignServerRelationId, + new_server->serverid, + form->subowner, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"", + GetUserNameFromId(form->subowner, false), + ForeignServerName(new_server->serverid))); + + /* make sure a user mapping exists */ + GetUserMapping(form->subowner, new_server->serverid); + + conninfo = ForeignServerConnectionString(form->subowner, + new_server->serverid); + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + /* Check the connection info string. */ + walrcv_check_conninfo(conninfo, + sub->passwordrequired && !sub->ownersuperuser); + + values[Anum_pg_subscription_subserver - 1] = new_server->serverid; + replaces[Anum_pg_subscription_subserver - 1] = true; + + ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL); + + update_tuple = true; + } + break; + case ALTER_SUBSCRIPTION_CONNECTION: + /* remove reference to foreign server and dependencies, if present */ + if (form->subserver) + { + deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid, + DEPENDENCY_NORMAL, + ForeignServerRelationId, form->subserver); + + values[Anum_pg_subscription_subserver - 1] = InvalidOid; + replaces[Anum_pg_subscription_subserver - 1] = true; + } + /* Load the library providing us libpq calls. */ load_file("libpqwalreceiver", false); /* Check the connection info string. */ @@ -2038,8 +2161,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, table_close(rel, RowExclusiveLock); - ObjectAddressSet(myself, SubscriptionRelationId, subid); - InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); /* Wake up related replication workers to handle this change quickly. */ @@ -2068,7 +2189,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) ListCell *lc; char originname[NAMEDATALEN]; char *err = NULL; - WalReceiverConn *wrconn; + WalReceiverConn *wrconn = NULL; Form_pg_subscription form; List *rstates; bool must_use_password; @@ -2126,9 +2247,35 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) subname = pstrdup(NameStr(*DatumGetName(datum))); /* Get conninfo */ - datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup, - Anum_pg_subscription_subconninfo); - conninfo = TextDatumGetCString(datum); + if (OidIsValid(form->subserver)) + { + AclResult aclresult; + + aclresult = object_aclcheck(ForeignServerRelationId, form->subserver, + form->subowner, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + { + /* + * Unable to generate connection string because permissions on the + * foreign server have been removed. Follow the same logic as an + * unusable subconninfo (which will result in an ERROR later + * unless slot_name = NONE). + */ + err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""), + GetUserNameFromId(form->subowner, false), + ForeignServerName(form->subserver)); + conninfo = NULL; + } + else + conninfo = ForeignServerConnectionString(form->subowner, + form->subserver); + } + else + { + datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup, + Anum_pg_subscription_subconninfo); + conninfo = TextDatumGetCString(datum); + } /* Get slotname */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, @@ -2227,6 +2374,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } /* Clean up dependencies */ + deleteDependencyRecordsFor(SubscriptionRelationId, subid, false); deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); /* Remove any associated relation synchronization states. */ @@ -2265,8 +2413,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) */ load_file("libpqwalreceiver", false); - wrconn = walrcv_connect(conninfo, true, true, must_use_password, - subname, &err); + if (conninfo) + wrconn = walrcv_connect(conninfo, true, true, must_use_password, + subname, &err); + if (wrconn == NULL) { if (!slotname) @@ -2436,6 +2586,27 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) aclcheck_error(aclresult, OBJECT_DATABASE, get_database_name(MyDatabaseId)); + /* + * If the subscription uses a server, check that the new owner has USAGE + * privileges on the server and that a user mapping exists. Note: does not + * re-check the resulting connection string. + */ + if (OidIsValid(form->subserver)) + { + Oid serverid = form->subserver; + + aclresult = object_aclcheck(ForeignServerRelationId, serverid, newOwnerId, ACL_USAGE); + if (aclresult != ACLCHECK_OK) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"", + GetUserNameFromId(newOwnerId, false), + ForeignServerName(serverid))); + + /* make sure a user mapping exists */ + GetUserMapping(newOwnerId, serverid); + } + form->subowner = newOwnerId; CatalogTupleUpdate(rel, &tup->t_self, tup); diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index b912a06dd15..c53699959ea 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -72,6 +72,7 @@ GetForeignDataWrapperExtended(Oid fdwid, bits16 flags) fdw->fdwname = pstrdup(NameStr(fdwform->fdwname)); fdw->fdwhandler = fdwform->fdwhandler; fdw->fdwvalidator = fdwform->fdwvalidator; + fdw->fdwconnection = fdwform->fdwconnection; /* Extract the fdwoptions */ datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID, @@ -176,6 +177,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags) } +/* + * ForeignServerName - get name of foreign server. + */ +char * +ForeignServerName(Oid serverid) +{ + Form_pg_foreign_server serverform; + char *servername; + HeapTuple tp; + + tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid)); + + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for foreign server %u", serverid); + + serverform = (Form_pg_foreign_server) GETSTRUCT(tp); + + servername = pstrdup(NameStr(serverform->srvname)); + + ReleaseSysCache(tp); + + return servername; +} + + /* * GetForeignServerByName - look up the foreign server definition by name. */ @@ -191,6 +217,66 @@ GetForeignServerByName(const char *srvname, bool missing_ok) } +/* + * Retrieve connection string from server's FDW. + */ +char * +ForeignServerConnectionString(Oid userid, Oid serverid) +{ + MemoryContext tempContext; + MemoryContext oldcxt; + volatile text *connection_text = NULL; + char *result = NULL; + + /* + * GetForeignServer, GetForeignDataWrapper, and the connection function + * itself all leak memory into CurrentMemoryContext. Switch to a temporary + * context for easy cleanup. + */ + tempContext = AllocSetContextCreate(CurrentMemoryContext, + "FDWConnectionContext", + ALLOCSET_SMALL_SIZES); + + oldcxt = MemoryContextSwitchTo(tempContext); + + PG_TRY(); + { + ForeignServer *server; + ForeignDataWrapper *fdw; + Datum connection_datum; + + server = GetForeignServer(serverid); + fdw = GetForeignDataWrapper(server->fdwid); + + if (!OidIsValid(fdw->fdwconnection)) + ereport(ERROR, + (errmsg("foreign data wrapper \"%s\" does not support subscription connections", + fdw->fdwname), + errdetail("Foreign data wrapper must be defined with CONNECTION specified."))); + + + connection_datum = OidFunctionCall3(fdw->fdwconnection, + ObjectIdGetDatum(userid), + ObjectIdGetDatum(serverid), + PointerGetDatum(NULL)); + + connection_text = DatumGetTextPP(connection_datum); + } + PG_FINALLY(); + { + MemoryContextSwitchTo(oldcxt); + + if (connection_text) + result = text_to_cstring((text *) connection_text); + + MemoryContextDelete(tempContext); + } + PG_END_TRY(); + + return result; +} + + /* * GetUserMapping - look up the user mapping. * diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 3c3e24324a8..9cbe8eafc45 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -5583,6 +5583,8 @@ fdw_option: | NO HANDLER { $$ = makeDefElem("handler", NULL, @1); } | VALIDATOR handler_name { $$ = makeDefElem("validator", (Node *) $2, @1); } | NO VALIDATOR { $$ = makeDefElem("validator", NULL, @1); } + | CONNECTION handler_name { $$ = makeDefElem("connection", (Node *) $2, @1); } + | NO CONNECTION { $$ = makeDefElem("connection", NULL, @1); } ; fdw_options: @@ -11057,6 +11059,16 @@ CreateSubscriptionStmt: n->options = $8; $$ = (Node *) n; } + | CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition + { + CreateSubscriptionStmt *n = + makeNode(CreateSubscriptionStmt); + n->subname = $3; + n->servername = $5; + n->publication = $7; + n->options = $8; + $$ = (Node *) n; + } ; /***************************************************************************** @@ -11086,6 +11098,16 @@ AlterSubscriptionStmt: n->conninfo = $5; $$ = (Node *) n; } + | ALTER SUBSCRIPTION name SERVER name + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + + n->kind = ALTER_SUBSCRIPTION_SERVER; + n->subname = $3; + n->servername = $5; + $$ = (Node *) n; + } | ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition { AlterSubscriptionStmt *n = diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 274dc71c6a1..3f7de2efce0 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -5059,7 +5059,7 @@ maybe_reread_subscription(void) /* Ensure allocations in permanent context. */ oldctx = MemoryContextSwitchTo(ApplyContext); - newsub = GetSubscription(MyLogicalRepWorker->subid, true); + newsub = GetSubscription(MyLogicalRepWorker->subid, true, true); /* * Exit if the subscription was removed. This normally should not happen @@ -5201,7 +5201,9 @@ set_wal_receiver_timeout(void) } /* - * Callback from subscription syscache invalidation. + * Callback from subscription syscache invalidation. Also needed for server or + * user mapping invalidation, which can change the connection information for + * subscriptions that connect using a server object. */ static void subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue) @@ -5806,7 +5808,7 @@ InitializeLogRepWorker(void) */ LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0, AccessShareLock); - MySubscription = GetSubscription(MyLogicalRepWorker->subid, true); + MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true); if (!MySubscription) { ereport(LOG, @@ -5871,6 +5873,22 @@ InitializeLogRepWorker(void) CacheRegisterSyscacheCallback(SUBSCRIPTIONOID, subscription_change_cb, (Datum) 0); + /* Changes to foreign servers may affect subscriptions using SERVER. */ + CacheRegisterSyscacheCallback(FOREIGNSERVEROID, + subscription_change_cb, + (Datum) 0); + /* Changes to user mappings may affect subscriptions using SERVER. */ + CacheRegisterSyscacheCallback(USERMAPPINGOID, + subscription_change_cb, + (Datum) 0); + + /* + * Changes to FDW connection_function may affect subscriptions using + * SERVER. + */ + CacheRegisterSyscacheCallback(FOREIGNDATAWRAPPEROID, + subscription_change_cb, + (Datum) 0); CacheRegisterSyscacheCallback(AUTHOID, subscription_change_cb, diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 1035bba72ce..88e0a55f8f1 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -5182,6 +5182,7 @@ getSubscriptions(Archive *fout) int i_subdisableonerr; int i_subpasswordrequired; int i_subrunasowner; + int i_subservername; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -5286,14 +5287,24 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 190000) appendPQExpBufferStr(query, - " s.subwalrcvtimeout\n"); + " s.subwalrcvtimeout,\n"); else appendPQExpBufferStr(query, - " '-1' AS subwalrcvtimeout\n"); + " '-1' AS subwalrcvtimeout,\n"); + + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, " fs.srvname AS subservername\n"); + else + appendPQExpBufferStr(query, " NULL AS subservername\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n"); + if (fout->remoteVersion >= 190000) + appendPQExpBufferStr(query, + "LEFT JOIN pg_catalog.pg_foreign_server fs \n" + " ON fs.oid = s.subserver \n"); + if (dopt->binary_upgrade && fout->remoteVersion >= 170000) appendPQExpBufferStr(query, "LEFT JOIN pg_catalog.pg_replication_origin_status o \n" @@ -5325,6 +5336,7 @@ getSubscriptions(Archive *fout) i_subfailover = PQfnumber(res, "subfailover"); i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples"); i_submaxretention = PQfnumber(res, "submaxretention"); + i_subservername = PQfnumber(res, "subservername"); i_subconninfo = PQfnumber(res, "subconninfo"); i_subslotname = PQfnumber(res, "subslotname"); i_subsynccommit = PQfnumber(res, "subsynccommit"); @@ -5347,6 +5359,10 @@ getSubscriptions(Archive *fout) subinfo[i].subenabled = (strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0); + if (PQgetisnull(res, i, i_subservername)) + subinfo[i].subservername = NULL; + else + subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername)); subinfo[i].subbinary = (strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0); subinfo[i].substream = *(PQgetvalue(res, i, i_substream)); @@ -5363,8 +5379,11 @@ getSubscriptions(Archive *fout) (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0); subinfo[i].submaxretention = atoi(PQgetvalue(res, i, i_submaxretention)); - subinfo[i].subconninfo = - pg_strdup(PQgetvalue(res, i, i_subconninfo)); + if (PQgetisnull(res, i, i_subconninfo)) + subinfo[i].subconninfo = NULL; + else + subinfo[i].subconninfo = + pg_strdup(PQgetvalue(res, i, i_subconninfo)); if (PQgetisnull(res, i, i_subslotname)) subinfo[i].subslotname = NULL; else @@ -5575,9 +5594,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n", qsubname); - appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ", + appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ", qsubname); - appendStringLiteralAH(query, subinfo->subconninfo, fout); + if (subinfo->subservername) + { + appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername)); + } + else + { + appendPQExpBuffer(query, "CONNECTION "); + appendStringLiteralAH(query, subinfo->subconninfo, fout); + } /* Build list of quoted publications and append them to query. */ if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames)) diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index e138ef1276c..1c11a79083f 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -720,6 +720,7 @@ typedef struct _SubscriptionInfo bool subfailover; bool subretaindeadtuples; int submaxretention; + char *subservername; char *subconninfo; char *subslotname; char *subsynccommit; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index a94eade282f..211d8f3b1ec 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6895,7 +6895,7 @@ describeSubscriptions(const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false, false, false, false, false, false, - false, false, false, false, false, false}; + false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6965,6 +6965,10 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Failover")); if (pset.sversion >= 190000) { + appendPQExpBuffer(&buf, + ", (select srvname from pg_foreign_server where oid=subserver) AS \"%s\"\n", + gettext_noop("Server")); + appendPQExpBuffer(&buf, ", subretaindeadtuples AS \"%s\"\n", gettext_noop("Retain dead tuples")); diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index f8c0865ca89..6484c6a3dd4 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -2332,7 +2332,7 @@ match_previous_words(int pattern_id, else if (Matches("ALTER", "SUBSCRIPTION", MatchAny)) COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO", "RENAME TO", "REFRESH PUBLICATION", "REFRESH SEQUENCES", - "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION"); + "SERVER", "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION"); /* ALTER SUBSCRIPTION REFRESH */ else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH")) COMPLETE_WITH("PUBLICATION", "SEQUENCES"); @@ -3870,9 +3870,16 @@ match_previous_words(int pattern_id, /* CREATE SUBSCRIPTION */ else if (Matches("CREATE", "SUBSCRIPTION", MatchAny)) - COMPLETE_WITH("CONNECTION"); + COMPLETE_WITH("SERVER", "CONNECTION"); + else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "SERVER", MatchAny)) + COMPLETE_WITH("PUBLICATION"); else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny)) COMPLETE_WITH("PUBLICATION"); + else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "SERVER", + MatchAny, "PUBLICATION")) + { + /* complete with nothing here as this refers to remote publications */ + } else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny, "PUBLICATION")) { diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index b863edfabda..b6508b60a84 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -57,6 +57,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 202603061 +#define CATALOG_VERSION_NO 202603062 #endif diff --git a/src/include/catalog/pg_foreign_data_wrapper.h b/src/include/catalog/pg_foreign_data_wrapper.h index e6009069e82..3d8389de65e 100644 --- a/src/include/catalog/pg_foreign_data_wrapper.h +++ b/src/include/catalog/pg_foreign_data_wrapper.h @@ -38,6 +38,9 @@ CATALOG(pg_foreign_data_wrapper,2328,ForeignDataWrapperRelationId) Oid fdwvalidator BKI_LOOKUP_OPT(pg_proc); /* option validation * function, or 0 if * none */ + Oid fdwconnection BKI_LOOKUP_OPT(pg_proc); /* connection string + * function, or 0 if + * none */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ aclitem fdwacl[1]; /* access permissions */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index c369b5abfb3..0058d9387d7 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -92,9 +92,12 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW * exceeded max_retention_duration, when * defined */ + Oid subserver BKI_LOOKUP_OPT(pg_foreign_server); /* If connection uses + * server */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ - text subconninfo BKI_FORCE_NOT_NULL; + text subconninfo; /* Set if connecting with connection string */ /* Slot name on publisher */ NameData subslotname BKI_FORCE_NULL; @@ -207,7 +210,8 @@ typedef struct Subscription #endif /* EXPOSE_TO_CLIENT_CODE */ -extern Subscription *GetSubscription(Oid subid, bool missing_ok); +extern Subscription *GetSubscription(Oid subid, bool missing_ok, + bool aclcheck); extern void FreeSubscription(Subscription *sub); extern void DisableSubscription(Oid subid); diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h index c185d1458a2..65ed9a7f987 100644 --- a/src/include/foreign/foreign.h +++ b/src/include/foreign/foreign.h @@ -28,6 +28,7 @@ typedef struct ForeignDataWrapper char *fdwname; /* Name of the FDW */ Oid fdwhandler; /* Oid of handler function, or 0 */ Oid fdwvalidator; /* Oid of validator function, or 0 */ + Oid fdwconnection; /* Oid of connection string function, or 0 */ List *options; /* fdwoptions as DefElem list */ } ForeignDataWrapper; @@ -65,10 +66,12 @@ typedef struct ForeignTable extern ForeignServer *GetForeignServer(Oid serverid); +extern char *ForeignServerName(Oid serverid); extern ForeignServer *GetForeignServerExtended(Oid serverid, bits16 flags); extern ForeignServer *GetForeignServerByName(const char *srvname, bool missing_ok); +extern char *ForeignServerConnectionString(Oid userid, Oid serverid); extern UserMapping *GetUserMapping(Oid userid, Oid serverid); extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid); extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index ff41943a6db..4ee092206b0 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4383,6 +4383,7 @@ typedef struct CreateSubscriptionStmt { NodeTag type; char *subname; /* Name of the subscription */ + char *servername; /* Server name of publisher */ char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ @@ -4391,6 +4392,7 @@ typedef struct CreateSubscriptionStmt typedef enum AlterSubscriptionType { ALTER_SUBSCRIPTION_OPTIONS, + ALTER_SUBSCRIPTION_SERVER, ALTER_SUBSCRIPTION_CONNECTION, ALTER_SUBSCRIPTION_SET_PUBLICATION, ALTER_SUBSCRIPTION_ADD_PUBLICATION, @@ -4406,6 +4408,7 @@ typedef struct AlterSubscriptionStmt NodeTag type; AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */ char *subname; /* Name of the subscription */ + char *servername; /* Server name of publisher */ char *conninfo; /* Connection string to publisher */ List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 25aaae8d05a..51b9608a668 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -224,6 +224,7 @@ NOTICE: checking pg_extension {extconfig} => pg_class {oid} NOTICE: checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid} NOTICE: checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid} NOTICE: checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid} +NOTICE: checking pg_foreign_data_wrapper {fdwconnection} => pg_proc {oid} NOTICE: checking pg_foreign_server {srvowner} => pg_authid {oid} NOTICE: checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid} NOTICE: checking pg_user_mapping {umuser} => pg_authid {oid} @@ -269,5 +270,6 @@ NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid} NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid} NOTICE: checking pg_subscription {subdbid} => pg_database {oid} NOTICE: checking pg_subscription {subowner} => pg_authid {oid} +NOTICE: checking pg_subscription {subserver} => pg_foreign_server {oid} NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid} NOTICE: checking pg_subscription_rel {srrelid} => pg_class {oid} diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index a5fdfe68a0e..f57f359127b 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -1,6 +1,14 @@ -- -- SUBSCRIPTION -- +-- directory paths and dlsuffix are passed to us in environment variables +\getenv libdir PG_LIBDIR +\getenv dlsuffix PG_DLSUFFIX +\set regresslib :libdir '/regress' :dlsuffix +CREATE FUNCTION test_fdw_connection(oid, oid, internal) + RETURNS text + AS :'regresslib', 'test_fdw_connection' + LANGUAGE C; CREATE ROLE regress_subscription_user LOGIN SUPERUSER; CREATE ROLE regress_subscription_user2; CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription; @@ -116,18 +124,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -140,15 +148,53 @@ ERROR: invalid connection string syntax: invalid connection option "i_dont_exis -- connecting, so this is reliable and safe) CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub; ERROR: subscription "regress_testsub5" could not connect to the publisher: invalid port number: "-1" +CREATE FOREIGN DATA WRAPPER test_fdw; +CREATE SERVER test_server FOREIGN DATA WRAPPER test_fdw; +GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user3; +-- fail, need USAGE privileges on server +CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false); +ERROR: permission denied for foreign server test_server +RESET SESSION AUTHORIZATION; +GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user3; +-- fail, need user mapping +CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false); +ERROR: user mapping not found for user "regress_subscription_user3", server "test_server" +CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret'); +-- fail, need CONNECTION clause +CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false); +ERROR: foreign data wrapper "test_fdw" does not support subscription connections +DETAIL: Foreign data wrapper must be defined with CONNECTION specified. +RESET SESSION AUTHORIZATION; +ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection; +SET SESSION AUTHORIZATION regress_subscription_user3; +CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. +DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server; +RESET SESSION AUTHORIZATION; +REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user3; +-- fail, must connect but lacks USAGE on server, as well as user mapping +DROP SUBSCRIPTION regress_testsub6; +ERROR: could not connect to publisher when attempting to drop replication slot "dummy": subscription owner "regress_subscription_user3" does not have permission on foreign server "test_server" +HINT: Use ALTER SUBSCRIPTION ... DISABLE to disable the subscription, and then use ALTER SUBSCRIPTION ... SET (slot_name = NONE) to disassociate it from the slot. +ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub6; +SET SESSION AUTHORIZATION regress_subscription_user; +REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3; +DROP SERVER test_server; +DROP FOREIGN DATA WRAPPER test_fdw; -- fail - invalid connection string during ALTER ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | test subscription + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | test subscription (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +203,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +222,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00012345 | test subscription + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00012345 | test subscription (1 row) -- ok - with lsn = NONE @@ -188,10 +234,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription (1 row) BEGIN; @@ -227,10 +273,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = '80s'); ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = 'foobar'); ERROR: invalid value for parameter "wal_receiver_timeout": "foobar" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 80s | 0/00000000 | test subscription + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+------------------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | local | dbname=regress_doesnotexist2 | 80s | 0/00000000 | test subscription (1 row) -- rename back to keep the rest simple @@ -259,19 +305,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) DROP SUBSCRIPTION regress_testsub; @@ -283,27 +329,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) -- fail - publication already exists @@ -318,10 +364,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) -- fail - publication used more than once @@ -336,10 +382,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) DROP SUBSCRIPTION regress_testsub; @@ -375,19 +421,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) -- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -397,10 +443,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -413,18 +459,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -437,10 +483,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -454,19 +500,19 @@ NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabl WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 1000 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) -- ok ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/regress.c b/src/test/regress/regress.c index a02f41c9727..68a01a1dde0 100644 --- a/src/test/regress/regress.c +++ b/src/test/regress/regress.c @@ -729,6 +729,13 @@ test_fdw_handler(PG_FUNCTION_ARGS) PG_RETURN_NULL(); } +PG_FUNCTION_INFO_V1(test_fdw_connection); +Datum +test_fdw_connection(PG_FUNCTION_ARGS) +{ + PG_RETURN_TEXT_P(cstring_to_text("dbname=regress_doesnotexist user=doesnotexist password=secret")); +} + PG_FUNCTION_INFO_V1(is_catalog_text_unique_index_oid); Datum is_catalog_text_unique_index_oid(PG_FUNCTION_ARGS) diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index d93cbc279d9..a642b368183 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -2,6 +2,17 @@ -- SUBSCRIPTION -- +-- directory paths and dlsuffix are passed to us in environment variables +\getenv libdir PG_LIBDIR +\getenv dlsuffix PG_DLSUFFIX + +\set regresslib :libdir '/regress' :dlsuffix + +CREATE FUNCTION test_fdw_connection(oid, oid, internal) + RETURNS text + AS :'regresslib', 'test_fdw_connection' + LANGUAGE C; + CREATE ROLE regress_subscription_user LOGIN SUPERUSER; CREATE ROLE regress_subscription_user2; CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription; @@ -85,6 +96,50 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'i_dont_exist=param' PUBLICATION -- connecting, so this is reliable and safe) CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub; +CREATE FOREIGN DATA WRAPPER test_fdw; +CREATE SERVER test_server FOREIGN DATA WRAPPER test_fdw; + +GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user3; + +-- fail, need USAGE privileges on server +CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false); + +RESET SESSION AUTHORIZATION; +GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user3; + +-- fail, need user mapping +CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false); + +CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret'); + +-- fail, need CONNECTION clause +CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false); + +RESET SESSION AUTHORIZATION; +ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection; +SET SESSION AUTHORIZATION regress_subscription_user3; + +CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false); + +DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server; +RESET SESSION AUTHORIZATION; +REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3; +SET SESSION AUTHORIZATION regress_subscription_user3; + +-- fail, must connect but lacks USAGE on server, as well as user mapping +DROP SUBSCRIPTION regress_testsub6; + +ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub6; + +SET SESSION AUTHORIZATION regress_subscription_user; +REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3; + +DROP SERVER test_server; +DROP FOREIGN DATA WRAPPER test_fdw; + -- fail - invalid connection string during ALTER ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';