CREATE SUBSCRIPTION ... SERVER.

Allow CREATE SUBSCRIPTION to accept a foreign server using the SERVER
clause instead of a raw connection string using the CONNECTION clause.

  * Enables a user with sufficient privileges to create a subscription
    using a foreign server by name without specifying the connection
    details.

  * Integrates with user mappings (and other FDW infrastructure) using
    the subscription owner.

  * Provides a layer of indirection to manage multiple subscriptions
    to the same remote server more easily.

Also add CREATE FOREIGN DATA WRAPPER ... CONNECTION clause to specify
a connection_function. To be eligible for a subscription, the foreign
server's foreign data wrapper must specify a connection_function.

Add connection_function support to postgres_fdw, and bump postgres_fdw
version to 1.3.

Bump catversion.

Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/61831790a0a937038f78ce09f8dd4cef7de7456a.camel@j-davis.com
This commit is contained in:
Jeff Davis 2026-03-06 08:27:56 -08:00
parent 868825aaeb
commit 8185bb5347
36 changed files with 1075 additions and 255 deletions

View file

@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
SHLIB_LINK_INTERNAL = $(libpq)
EXTENSION = postgres_fdw
DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql
DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql postgres_fdw--1.2--1.3.sql
REGRESS = postgres_fdw query_cancel
ISOLATION = eval_plan_qual

View file

@ -132,6 +132,7 @@ PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
PG_FUNCTION_INFO_V1(postgres_fdw_connection);
/* prototypes of private functions */
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
@ -476,6 +477,142 @@ pgfdw_security_check(const char **keywords, const char **values, UserMapping *us
errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
}
/*
* Construct connection params from generic options of ForeignServer and
* UserMapping. (Some of them might not be libpq options, in which case we'll
* just waste a few array slots.)
*/
static void
construct_connection_params(ForeignServer *server, UserMapping *user,
const char ***p_keywords, const char ***p_values,
char **p_appname)
{
const char **keywords;
const char **values;
char *appname = NULL;
int n;
/*
* Add 4 extra slots for application_name, fallback_application_name,
* client_encoding, end marker, and 3 extra slots for scram keys and
* required scram pass-through options.
*/
n = list_length(server->options) + list_length(user->options) + 4 + 3;
keywords = (const char **) palloc(n * sizeof(char *));
values = (const char **) palloc(n * sizeof(char *));
n = 0;
n += ExtractConnectionOptions(server->options,
keywords + n, values + n);
n += ExtractConnectionOptions(user->options,
keywords + n, values + n);
/*
* Use pgfdw_application_name as application_name if set.
*
* PQconnectdbParams() processes the parameter arrays from start to end.
* If any key word is repeated, the last value is used. Therefore note
* that pgfdw_application_name must be added to the arrays after options
* of ForeignServer are, so that it can override application_name set in
* ForeignServer.
*/
if (pgfdw_application_name && *pgfdw_application_name != '\0')
{
keywords[n] = "application_name";
values[n] = pgfdw_application_name;
n++;
}
/*
* Search the parameter arrays to find application_name setting, and
* replace escape sequences in it with status information if found. The
* arrays are searched backwards because the last value is used if
* application_name is repeatedly set.
*/
for (int i = n - 1; i >= 0; i--)
{
if (strcmp(keywords[i], "application_name") == 0 &&
*(values[i]) != '\0')
{
/*
* Use this application_name setting if it's not empty string even
* after any escape sequences in it are replaced.
*/
appname = process_pgfdw_appname(values[i]);
if (appname[0] != '\0')
{
values[i] = appname;
break;
}
/*
* This empty application_name is not used, so we set values[i] to
* NULL and keep searching the array to find the next one.
*/
values[i] = NULL;
pfree(appname);
appname = NULL;
}
}
*p_appname = appname;
/* Use "postgres_fdw" as fallback_application_name */
keywords[n] = "fallback_application_name";
values[n] = "postgres_fdw";
n++;
/* Set client_encoding so that libpq can convert encoding properly. */
keywords[n] = "client_encoding";
values[n] = GetDatabaseEncodingName();
n++;
/* Add required SCRAM pass-through connection options if it's enabled. */
if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
{
int len;
int encoded_len;
keywords[n] = "scram_client_key";
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
/* don't forget the zero-terminator */
values[n] = palloc0(len + 1);
encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
sizeof(MyProcPort->scram_ClientKey),
(char *) values[n], len);
if (encoded_len < 0)
elog(ERROR, "could not encode SCRAM client key");
n++;
keywords[n] = "scram_server_key";
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
/* don't forget the zero-terminator */
values[n] = palloc0(len + 1);
encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
sizeof(MyProcPort->scram_ServerKey),
(char *) values[n], len);
if (encoded_len < 0)
elog(ERROR, "could not encode SCRAM server key");
n++;
/*
* Require scram-sha-256 to ensure that no other auth method is used
* when connecting with foreign server.
*/
keywords[n] = "require_auth";
values[n] = "scram-sha-256";
n++;
}
keywords[n] = values[n] = NULL;
/* Verify the set of connection parameters. */
check_conn_params(keywords, values, user);
*p_keywords = keywords;
*p_values = values;
}
/*
* Connect to remote server using specified server and user mapping properties.
*/
@ -491,127 +628,9 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
{
const char **keywords;
const char **values;
char *appname = NULL;
int n;
char *appname;
/*
* Construct connection params from generic options of ForeignServer
* and UserMapping. (Some of them might not be libpq options, in
* which case we'll just waste a few array slots.) Add 4 extra slots
* for application_name, fallback_application_name, client_encoding,
* end marker, and 3 extra slots for scram keys and required scram
* pass-through options.
*/
n = list_length(server->options) + list_length(user->options) + 4 + 3;
keywords = (const char **) palloc(n * sizeof(char *));
values = (const char **) palloc(n * sizeof(char *));
n = 0;
n += ExtractConnectionOptions(server->options,
keywords + n, values + n);
n += ExtractConnectionOptions(user->options,
keywords + n, values + n);
/*
* Use pgfdw_application_name as application_name if set.
*
* PQconnectdbParams() processes the parameter arrays from start to
* end. If any key word is repeated, the last value is used. Therefore
* note that pgfdw_application_name must be added to the arrays after
* options of ForeignServer are, so that it can override
* application_name set in ForeignServer.
*/
if (pgfdw_application_name && *pgfdw_application_name != '\0')
{
keywords[n] = "application_name";
values[n] = pgfdw_application_name;
n++;
}
/*
* Search the parameter arrays to find application_name setting, and
* replace escape sequences in it with status information if found.
* The arrays are searched backwards because the last value is used if
* application_name is repeatedly set.
*/
for (int i = n - 1; i >= 0; i--)
{
if (strcmp(keywords[i], "application_name") == 0 &&
*(values[i]) != '\0')
{
/*
* Use this application_name setting if it's not empty string
* even after any escape sequences in it are replaced.
*/
appname = process_pgfdw_appname(values[i]);
if (appname[0] != '\0')
{
values[i] = appname;
break;
}
/*
* This empty application_name is not used, so we set
* values[i] to NULL and keep searching the array to find the
* next one.
*/
values[i] = NULL;
pfree(appname);
appname = NULL;
}
}
/* Use "postgres_fdw" as fallback_application_name */
keywords[n] = "fallback_application_name";
values[n] = "postgres_fdw";
n++;
/* Set client_encoding so that libpq can convert encoding properly. */
keywords[n] = "client_encoding";
values[n] = GetDatabaseEncodingName();
n++;
/* Add required SCRAM pass-through connection options if it's enabled. */
if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
{
int len;
int encoded_len;
keywords[n] = "scram_client_key";
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
/* don't forget the zero-terminator */
values[n] = palloc0(len + 1);
encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
sizeof(MyProcPort->scram_ClientKey),
(char *) values[n], len);
if (encoded_len < 0)
elog(ERROR, "could not encode SCRAM client key");
n++;
keywords[n] = "scram_server_key";
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
/* don't forget the zero-terminator */
values[n] = palloc0(len + 1);
encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
sizeof(MyProcPort->scram_ServerKey),
(char *) values[n], len);
if (encoded_len < 0)
elog(ERROR, "could not encode SCRAM server key");
n++;
/*
* Require scram-sha-256 to ensure that no other auth method is
* used when connecting with foreign server.
*/
keywords[n] = "require_auth";
values[n] = "scram-sha-256";
n++;
}
keywords[n] = values[n] = NULL;
/* Verify the set of connection parameters. */
check_conn_params(keywords, values, user);
construct_connection_params(server, user, &keywords, &values, &appname);
/* first time, allocate or get the custom wait event */
if (pgfdw_we_connect == 0)
@ -2310,6 +2329,56 @@ postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
}
}
/*
* Values in connection strings must be enclosed in single quotes. Single
* quotes and backslashes must be escaped with backslash. NB: these rules are
* different from the rules for escaping a SQL literal.
*/
static void
appendEscapedValue(StringInfo str, const char *val)
{
appendStringInfoChar(str, '\'');
for (int i = 0; val[i] != '\0'; i++)
{
if (val[i] == '\\' || val[i] == '\'')
appendStringInfoChar(str, '\\');
appendStringInfoChar(str, val[i]);
}
appendStringInfoChar(str, '\'');
}
Datum
postgres_fdw_connection(PG_FUNCTION_ARGS)
{
Oid userid = PG_GETARG_OID(0);
Oid serverid = PG_GETARG_OID(1);
ForeignServer *server = GetForeignServer(serverid);
UserMapping *user = GetUserMapping(userid, serverid);
StringInfoData str;
const char **keywords;
const char **values;
char *appname;
char *sep = "";
construct_connection_params(server, user, &keywords, &values, &appname);
initStringInfo(&str);
for (int i = 0; keywords[i] != NULL; i++)
{
if (values[i] == NULL)
continue;
appendStringInfo(&str, "%s%s = ", sep, keywords[i]);
appendEscapedValue(&str, values[i]);
sep = " ";
}
if (appname != NULL)
pfree(appname);
pfree(keywords);
pfree(values);
PG_RETURN_TEXT_P(cstring_to_text(str.data));
}
/*
* List active foreign server connections.
*

View file

@ -255,6 +255,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
ANALYZE ft1;
ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
-- ===================================================================
-- test subscription
-- ===================================================================
CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
DROP SUBSCRIPTION regress_pgfdw_subscription;
-- ===================================================================
-- test error case for create publication on foreign table
-- ===================================================================
CREATE PUBLICATION testpub_ftbl FOR TABLE ft1; -- should fail

View file

@ -27,6 +27,7 @@ install_data(
'postgres_fdw--1.0.sql',
'postgres_fdw--1.0--1.1.sql',
'postgres_fdw--1.1--1.2.sql',
'postgres_fdw--1.2--1.3.sql',
kwargs: contrib_data_args,
)
@ -50,6 +51,7 @@ tests += {
'tap': {
'tests': [
't/001_auth_scram.pl',
't/010_subscription.pl',
],
},
}

View file

@ -0,0 +1,12 @@
/* contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql */
-- complain if script is sourced in psql, rather than via ALTER EXTENSION
\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.3'" to load this file. \quit
-- takes internal parameter to prevent calling from SQL
CREATE FUNCTION postgres_fdw_connection(oid, oid, internal)
RETURNS text
AS 'MODULE_PATHNAME'
LANGUAGE C STRICT PARALLEL RESTRICTED;
ALTER FOREIGN DATA WRAPPER postgres_fdw CONNECTION postgres_fdw_connection;

View file

@ -1,5 +1,5 @@
# postgres_fdw extension
comment = 'foreign-data wrapper for remote PostgreSQL servers'
default_version = '1.2'
default_version = '1.3'
module_pathname = '$libdir/postgres_fdw'
relocatable = true

View file

@ -244,6 +244,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
ANALYZE ft1;
ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
-- ===================================================================
-- test subscription
-- ===================================================================
CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
DROP SUBSCRIPTION regress_pgfdw_subscription;
-- ===================================================================
-- test error case for create publication on foreign table
-- ===================================================================

View file

@ -0,0 +1,71 @@
# Copyright (c) 2021-2026, PostgreSQL Global Development Group
# Basic logical replication test
use strict;
use warnings FATAL => 'all';
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
# Initialize publisher node
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
$node_publisher->init(allows_streaming => 'logical');
$node_publisher->start;
# Create subscriber node
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->start;
# Create some preexisting content on publisher
$node_publisher->safe_psql('postgres',
"CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a");
# Replicate the changes without columns
$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()");
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_no_col default VALUES");
# Setup structure on subscriber
$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw");
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)");
# Setup logical replication
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins");
my $publisher_host = $node_publisher->host;
my $publisher_port = $node_publisher->port;
$node_subscriber->safe_psql('postgres',
"CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
);
$node_subscriber->safe_psql('postgres',
"CREATE USER MAPPING FOR PUBLIC SERVER tap_server"
);
$node_subscriber->safe_psql('postgres',
"CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')"
);
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)"
);
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
is($result, qq(1002), 'check that initial data was copied to subscriber');
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a");
$node_publisher->wait_for_catchup('tap_sub');
$result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
is($result, qq(1050), 'check that inserted data was copied to subscriber');
done_testing();

View file

@ -2577,7 +2577,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
<para>
To create a subscription, the user must have the privileges of
the <literal>pg_create_subscription</literal> role, as well as
<literal>CREATE</literal> privileges on the database.
<literal>CREATE</literal> privileges on the database. If
<literal>SERVER</literal> is specified, the user also must have
<literal>USAGE</literal> privileges on the server.
</para>
<para>

View file

@ -1049,6 +1049,32 @@ postgres=# SELECT postgres_fdw_disconnect_all();
</para>
</sect2>
<sect2 id="postgres-fdw-server-subscription">
<title>Subscription Management</title>
<para>
<filename>postgres_fdw</filename> supports subscription connections using
the same options described in <xref
linkend="postgres-fdw-options-connection"/>.
</para>
<para>
For example, assuming the remote server <literal>foreign-host</literal> has
a publication <literal>testpub</literal>:
<programlisting>
CREATE SERVER subscription_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'foreign-host', dbname 'foreign_db');
CREATE USER MAPPING FOR local_user SERVER subscription_server OPTIONS (user 'foreign_user', password 'password');
CREATE SUBSCRIPTION my_subscription SERVER subscription_server PUBLICATION testpub;
</programlisting>
</para>
<para>
To create a subscription, the user must be a member of the <xref
linkend="predefined-role-pg-create-subscription"/> role and have
<literal>USAGE</literal> privileges on the server.
</para>
</sect2>
<sect2 id="postgres-fdw-transaction-management">
<title>Transaction Management</title>

View file

@ -24,6 +24,7 @@ PostgreSQL documentation
ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
[ HANDLER <replaceable class="parameter">handler_function</replaceable> | NO HANDLER ]
[ VALIDATOR <replaceable class="parameter">validator_function</replaceable> | NO VALIDATOR ]
[ CONNECTION <replaceable class="parameter">connection_function</replaceable> | NO CONNECTION ]
[ OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ]) ]
ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@ -112,6 +113,25 @@ ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> REN
</listitem>
</varlistentry>
<varlistentry>
<term><literal>CONNECTION <replaceable class="parameter">connection_function</replaceable></literal></term>
<listitem>
<para>
Specifies a new connection function for the foreign-data wrapper.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>NO CONNECTION</literal></term>
<listitem>
<para>
This is used to specify that the foreign-data wrapper should no
longer have a connection function.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ] )</literal></term>
<listitem>

View file

@ -21,6 +21,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@ -102,13 +103,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
</listitem>
</varlistentry>
<varlistentry id="sql-altersubscription-params-server">
<term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
<listitem>
<para>
This clause replaces the foreign server or connection string originally
set by <xref linkend="sql-createsubscription"/> with the foreign server
<replaceable>servername</replaceable>.
</para>
</listitem>
</varlistentry>
<varlistentry id="sql-altersubscription-params-connection">
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
<listitem>
<para>
This clause replaces the connection string originally set by
<xref linkend="sql-createsubscription"/>. See there for more
information.
This clause replaces the foreign server or connection string originally
set by <xref linkend="sql-createsubscription"/> with the connection
string <replaceable>conninfo</replaceable>.
</para>
</listitem>
</varlistentry>

View file

@ -24,6 +24,7 @@ PostgreSQL documentation
CREATE FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
[ HANDLER <replaceable class="parameter">handler_function</replaceable> | NO HANDLER ]
[ VALIDATOR <replaceable class="parameter">validator_function</replaceable> | NO VALIDATOR ]
[ CONNECTION <replaceable class="parameter">connection_function</replaceable> | NO CONNECTION ]
[ OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] ) ]
</synopsis>
</refsynopsisdiv>
@ -99,6 +100,25 @@ CREATE FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>CONNECTION <replaceable class="parameter">connection_function</replaceable></literal></term>
<listitem>
<para>
<replaceable class="parameter">connection_function</replaceable> is the
name of a previously registered function that will be called to generate
the postgres connection string when a foreign server is used as part of
<xref linkend="sql-createsubscription"/>. If no connection function or
<literal>NO CONNECTION</literal> is specified, then servers using this
foreign data wrapper cannot be used for <literal>CREATE
SUBSCRIPTION</literal>. The connection function must take three
arguments: one of type <type>oid</type> for the user, one of type
<type>oid</type> for the server, and an unused third argument of type
<type>internal</type> (which prevents calling the function in other
contexts).
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><literal>OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] )</literal></term>
<listitem>

View file

@ -42,6 +42,13 @@ CREATE SERVER [ IF NOT EXISTS ] <replaceable class="parameter">server_name</repl
means of user mappings.
</para>
<para>
If the foreign data wrapper <replaceable>fdw_name</replaceable> is
specified with a <literal>CONNECTION</literal> clause, then <xref
linkend="sql-createsubscription"/> may use this foreign server for
connection information.
</para>
<para>
The server name must be unique within the database.
</para>

View file

@ -22,7 +22,7 @@ PostgreSQL documentation
<refsynopsisdiv>
<synopsis>
CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
{ SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
[ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
</synopsis>
@ -77,6 +77,20 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</listitem>
</varlistentry>
<varlistentry id="sql-createsubscription-params-server">
<term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
<listitem>
<para>
A foreign server to use for the connection. The server's foreign data
wrapper must have a <replaceable>connection_function</replaceable>
registered, and a user mapping for the subscription owner on the server
must exist. Additionally, the subscription owner must have
<literal>USAGE</literal> privileges on
<replaceable>servername</replaceable>.
</para>
</listitem>
</varlistentry>
<varlistentry id="sql-createsubscription-params-connection">
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
<listitem>

View file

@ -895,6 +895,17 @@ findDependentObjects(const ObjectAddress *object,
object->objectSubId == 0)
continue;
/*
* Check that the dependent object is not in a shared catalog, which
* is not supported by doDeletion().
*/
if (IsSharedRelation(otherObject.classId))
ereport(ERROR,
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
errmsg("cannot drop %s because %s depends on it",
getObjectDescription(object, false),
getObjectDescription(&otherObject, false))));
/*
* Must lock the dependent object before recursing to it.
*/

View file

@ -19,11 +19,14 @@
#include "access/htup_details.h"
#include "access/tableam.h"
#include "catalog/indexing.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@ -69,7 +72,7 @@ GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
* Fetch the subscription from the syscache.
*/
Subscription *
GetSubscription(Oid subid, bool missing_ok)
GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
{
HeapTuple tup;
Subscription *sub;
@ -108,10 +111,35 @@ GetSubscription(Oid subid, bool missing_ok)
sub->retentionactive = subform->subretentionactive;
/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
tup,
Anum_pg_subscription_subconninfo);
sub->conninfo = TextDatumGetCString(datum);
if (OidIsValid(subform->subserver))
{
AclResult aclresult;
/* recheck ACL if requested */
if (aclcheck)
{
aclresult = object_aclcheck(ForeignServerRelationId,
subform->subserver,
subform->subowner, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
GetUserNameFromId(subform->subowner, false),
ForeignServerName(subform->subserver))));
}
sub->conninfo = ForeignServerConnectionString(subform->subowner,
subform->subserver);
}
else
{
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
tup,
Anum_pg_subscription_subconninfo);
sub->conninfo = TextDatumGetCString(datum);
}
/* Get slotname */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,

View file

@ -1449,7 +1449,7 @@ GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
subbinary, substream, subtwophasestate, subdisableonerr,
subpasswordrequired, subrunasowner, subfailover,
subretaindeadtuples, submaxretention, subretentionactive,
subslotname, subsynccommit, subpublications, suborigin)
subserver, subslotname, subsynccommit, subpublications, suborigin)
ON pg_subscription TO public;
CREATE VIEW pg_stat_subscription_stats AS

View file

@ -522,21 +522,53 @@ lookup_fdw_validator_func(DefElem *validator)
/* validator's return value is ignored, so we don't check the type */
}
/*
* Convert a connection string function name passed from the parser to an Oid.
*/
static Oid
lookup_fdw_connection_func(DefElem *connection)
{
Oid connectionOid;
Oid funcargtypes[3];
if (connection == NULL || connection->arg == NULL)
return InvalidOid;
/* connection string functions take user oid, server oid */
funcargtypes[0] = OIDOID;
funcargtypes[1] = OIDOID;
funcargtypes[2] = INTERNALOID;
connectionOid = LookupFuncName((List *) connection->arg, 3, funcargtypes, false);
/* check that connection string function has correct return type */
if (get_func_rettype(connectionOid) != TEXTOID)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("function %s must return type %s",
NameListToString((List *) connection->arg), "text")));
return connectionOid;
}
/*
* Process function options of CREATE/ALTER FDW
*/
static void
parse_func_options(ParseState *pstate, List *func_options,
bool *handler_given, Oid *fdwhandler,
bool *validator_given, Oid *fdwvalidator)
bool *validator_given, Oid *fdwvalidator,
bool *connection_given, Oid *fdwconnection)
{
ListCell *cell;
*handler_given = false;
*validator_given = false;
*connection_given = false;
/* return InvalidOid if not given */
*fdwhandler = InvalidOid;
*fdwvalidator = InvalidOid;
*fdwconnection = InvalidOid;
foreach(cell, func_options)
{
@ -556,6 +588,13 @@ parse_func_options(ParseState *pstate, List *func_options,
*validator_given = true;
*fdwvalidator = lookup_fdw_validator_func(def);
}
else if (strcmp(def->defname, "connection") == 0)
{
if (*connection_given)
errorConflictingDefElem(def, pstate);
*connection_given = true;
*fdwconnection = lookup_fdw_connection_func(def);
}
else
elog(ERROR, "option \"%s\" not recognized",
def->defname);
@ -575,8 +614,10 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
Oid fdwId;
bool handler_given;
bool validator_given;
bool connection_given;
Oid fdwhandler;
Oid fdwvalidator;
Oid fdwconnection;
Datum fdwoptions;
Oid ownerId;
ObjectAddress myself;
@ -620,10 +661,12 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
/* Lookup handler and validator functions, if given */
parse_func_options(pstate, stmt->func_options,
&handler_given, &fdwhandler,
&validator_given, &fdwvalidator);
&validator_given, &fdwvalidator,
&connection_given, &fdwconnection);
values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler);
values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator);
values[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true;
@ -695,8 +738,10 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
Datum datum;
bool handler_given;
bool validator_given;
bool connection_given;
Oid fdwhandler;
Oid fdwvalidator;
Oid fdwconnection;
ObjectAddress myself;
rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock);
@ -726,7 +771,8 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
parse_func_options(pstate, stmt->func_options,
&handler_given, &fdwhandler,
&validator_given, &fdwvalidator);
&validator_given, &fdwvalidator,
&connection_given, &fdwconnection);
if (handler_given)
{
@ -764,6 +810,12 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
fdwvalidator = fdwForm->fdwvalidator;
}
if (connection_given)
{
repl_val[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
repl_repl[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = true;
}
/*
* If options specified, validate and update.
*/

View file

@ -27,13 +27,16 @@
#include "catalog/objectaddress.h"
#include "catalog/pg_authid_d.h"
#include "catalog/pg_database_d.h"
#include "catalog/pg_foreign_server.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "catalog/pg_user_mapping.h"
#include "commands/defrem.h"
#include "commands/event_trigger.h"
#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "pgstat.h"
@ -619,6 +622,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
Datum values[Natts_pg_subscription];
Oid owner = GetUserId();
HeapTuple tup;
Oid serverid;
char *conninfo;
char originname[NAMEDATALEN];
List *publications;
@ -730,15 +734,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
if (opts.wal_receiver_timeout == NULL)
opts.wal_receiver_timeout = "-1";
conninfo = stmt->conninfo;
publications = stmt->publication;
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
if (stmt->servername)
{
ForeignServer *server;
Assert(!stmt->conninfo);
conninfo = NULL;
server = GetForeignServerByName(stmt->servername, false);
aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
/* make sure a user mapping exists */
GetUserMapping(owner, server->serverid);
serverid = server->serverid;
conninfo = ForeignServerConnectionString(owner, serverid);
}
else
{
Assert(stmt->conninfo);
serverid = InvalidOid;
conninfo = stmt->conninfo;
}
/* Check the connection info string. */
walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
publications = stmt->publication;
/* Everything ok, form a new tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
@ -768,8 +797,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
Int32GetDatum(opts.maxretention);
values[Anum_pg_subscription_subretentionactive - 1] =
Int32GetDatum(opts.retaindeadtuples);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
values[Anum_pg_subscription_subserver - 1] = serverid;
if (!OidIsValid(serverid))
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
else
nulls[Anum_pg_subscription_subconninfo - 1] = true;
if (opts.slot_name)
values[Anum_pg_subscription_subslotname - 1] =
DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@ -792,6 +825,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
ObjectAddressSet(myself, SubscriptionRelationId, subid);
if (stmt->servername)
{
ObjectAddress referenced;
Assert(OidIsValid(serverid));
ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
}
/*
* A replication origin is currently created for all subscriptions,
* including those that only contain sequences or are otherwise empty.
@ -945,8 +990,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
if (opts.enabled || opts.retaindeadtuples)
ApplyLauncherWakeupAtCommit();
ObjectAddressSet(myself, SubscriptionRelationId, subid);
InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
return myself;
@ -1410,7 +1453,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
stmt->subname);
sub = GetSubscription(subid, false);
/*
* Skip ACL checks on the subscription's foreign server, if any. If
* changing the server (or replacing it with a raw connection), then the
* old one will be removed anyway. If changing something unrelated,
* there's no need to do an additional ACL check here; that will be done
* by the subscription worker anyway.
*/
sub = GetSubscription(subid, false, false);
retain_dead_tuples = sub->retaindeadtuples;
origin = sub->origin;
@ -1435,6 +1485,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(replaces));
ObjectAddressSet(myself, SubscriptionRelationId, subid);
switch (stmt->kind)
{
case ALTER_SUBSCRIPTION_OPTIONS:
@ -1753,7 +1805,78 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
break;
}
case ALTER_SUBSCRIPTION_SERVER:
{
ForeignServer *new_server;
ObjectAddress referenced;
AclResult aclresult;
char *conninfo;
/*
* Remove what was there before, either another foreign server
* or a connection string.
*/
if (form->subserver)
{
deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
DEPENDENCY_NORMAL,
ForeignServerRelationId, form->subserver);
}
else
{
nulls[Anum_pg_subscription_subconninfo - 1] = true;
replaces[Anum_pg_subscription_subconninfo - 1] = true;
}
/*
* Check that the subscription owner has USAGE privileges on
* the server.
*/
new_server = GetForeignServerByName(stmt->servername, false);
aclresult = object_aclcheck(ForeignServerRelationId,
new_server->serverid,
form->subowner, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
ereport(ERROR,
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
GetUserNameFromId(form->subowner, false),
ForeignServerName(new_server->serverid)));
/* make sure a user mapping exists */
GetUserMapping(form->subowner, new_server->serverid);
conninfo = ForeignServerConnectionString(form->subowner,
new_server->serverid);
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
/* Check the connection info string. */
walrcv_check_conninfo(conninfo,
sub->passwordrequired && !sub->ownersuperuser);
values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
replaces[Anum_pg_subscription_subserver - 1] = true;
ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
update_tuple = true;
}
break;
case ALTER_SUBSCRIPTION_CONNECTION:
/* remove reference to foreign server and dependencies, if present */
if (form->subserver)
{
deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
DEPENDENCY_NORMAL,
ForeignServerRelationId, form->subserver);
values[Anum_pg_subscription_subserver - 1] = InvalidOid;
replaces[Anum_pg_subscription_subserver - 1] = true;
}
/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);
/* Check the connection info string. */
@ -2038,8 +2161,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
table_close(rel, RowExclusiveLock);
ObjectAddressSet(myself, SubscriptionRelationId, subid);
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
/* Wake up related replication workers to handle this change quickly. */
@ -2068,7 +2189,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
ListCell *lc;
char originname[NAMEDATALEN];
char *err = NULL;
WalReceiverConn *wrconn;
WalReceiverConn *wrconn = NULL;
Form_pg_subscription form;
List *rstates;
bool must_use_password;
@ -2126,9 +2247,35 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
subname = pstrdup(NameStr(*DatumGetName(datum)));
/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
Anum_pg_subscription_subconninfo);
conninfo = TextDatumGetCString(datum);
if (OidIsValid(form->subserver))
{
AclResult aclresult;
aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
form->subowner, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
{
/*
* Unable to generate connection string because permissions on the
* foreign server have been removed. Follow the same logic as an
* unusable subconninfo (which will result in an ERROR later
* unless slot_name = NONE).
*/
err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
GetUserNameFromId(form->subowner, false),
ForeignServerName(form->subserver));
conninfo = NULL;
}
else
conninfo = ForeignServerConnectionString(form->subowner,
form->subserver);
}
else
{
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
Anum_pg_subscription_subconninfo);
conninfo = TextDatumGetCString(datum);
}
/* Get slotname */
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@ -2227,6 +2374,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
}
/* Clean up dependencies */
deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
/* Remove any associated relation synchronization states. */
@ -2265,8 +2413,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
*/
load_file("libpqwalreceiver", false);
wrconn = walrcv_connect(conninfo, true, true, must_use_password,
subname, &err);
if (conninfo)
wrconn = walrcv_connect(conninfo, true, true, must_use_password,
subname, &err);
if (wrconn == NULL)
{
if (!slotname)
@ -2436,6 +2586,27 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
aclcheck_error(aclresult, OBJECT_DATABASE,
get_database_name(MyDatabaseId));
/*
* If the subscription uses a server, check that the new owner has USAGE
* privileges on the server and that a user mapping exists. Note: does not
* re-check the resulting connection string.
*/
if (OidIsValid(form->subserver))
{
Oid serverid = form->subserver;
aclresult = object_aclcheck(ForeignServerRelationId, serverid, newOwnerId, ACL_USAGE);
if (aclresult != ACLCHECK_OK)
ereport(ERROR,
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
GetUserNameFromId(newOwnerId, false),
ForeignServerName(serverid)));
/* make sure a user mapping exists */
GetUserMapping(newOwnerId, serverid);
}
form->subowner = newOwnerId;
CatalogTupleUpdate(rel, &tup->t_self, tup);

View file

@ -72,6 +72,7 @@ GetForeignDataWrapperExtended(Oid fdwid, bits16 flags)
fdw->fdwname = pstrdup(NameStr(fdwform->fdwname));
fdw->fdwhandler = fdwform->fdwhandler;
fdw->fdwvalidator = fdwform->fdwvalidator;
fdw->fdwconnection = fdwform->fdwconnection;
/* Extract the fdwoptions */
datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID,
@ -176,6 +177,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags)
}
/*
* ForeignServerName - get name of foreign server.
*/
char *
ForeignServerName(Oid serverid)
{
Form_pg_foreign_server serverform;
char *servername;
HeapTuple tp;
tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for foreign server %u", serverid);
serverform = (Form_pg_foreign_server) GETSTRUCT(tp);
servername = pstrdup(NameStr(serverform->srvname));
ReleaseSysCache(tp);
return servername;
}
/*
* GetForeignServerByName - look up the foreign server definition by name.
*/
@ -191,6 +217,66 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
}
/*
* Retrieve connection string from server's FDW.
*/
char *
ForeignServerConnectionString(Oid userid, Oid serverid)
{
MemoryContext tempContext;
MemoryContext oldcxt;
volatile text *connection_text = NULL;
char *result = NULL;
/*
* GetForeignServer, GetForeignDataWrapper, and the connection function
* itself all leak memory into CurrentMemoryContext. Switch to a temporary
* context for easy cleanup.
*/
tempContext = AllocSetContextCreate(CurrentMemoryContext,
"FDWConnectionContext",
ALLOCSET_SMALL_SIZES);
oldcxt = MemoryContextSwitchTo(tempContext);
PG_TRY();
{
ForeignServer *server;
ForeignDataWrapper *fdw;
Datum connection_datum;
server = GetForeignServer(serverid);
fdw = GetForeignDataWrapper(server->fdwid);
if (!OidIsValid(fdw->fdwconnection))
ereport(ERROR,
(errmsg("foreign data wrapper \"%s\" does not support subscription connections",
fdw->fdwname),
errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
connection_datum = OidFunctionCall3(fdw->fdwconnection,
ObjectIdGetDatum(userid),
ObjectIdGetDatum(serverid),
PointerGetDatum(NULL));
connection_text = DatumGetTextPP(connection_datum);
}
PG_FINALLY();
{
MemoryContextSwitchTo(oldcxt);
if (connection_text)
result = text_to_cstring((text *) connection_text);
MemoryContextDelete(tempContext);
}
PG_END_TRY();
return result;
}
/*
* GetUserMapping - look up the user mapping.
*

View file

@ -5583,6 +5583,8 @@ fdw_option:
| NO HANDLER { $$ = makeDefElem("handler", NULL, @1); }
| VALIDATOR handler_name { $$ = makeDefElem("validator", (Node *) $2, @1); }
| NO VALIDATOR { $$ = makeDefElem("validator", NULL, @1); }
| CONNECTION handler_name { $$ = makeDefElem("connection", (Node *) $2, @1); }
| NO CONNECTION { $$ = makeDefElem("connection", NULL, @1); }
;
fdw_options:
@ -11057,6 +11059,16 @@ CreateSubscriptionStmt:
n->options = $8;
$$ = (Node *) n;
}
| CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
{
CreateSubscriptionStmt *n =
makeNode(CreateSubscriptionStmt);
n->subname = $3;
n->servername = $5;
n->publication = $7;
n->options = $8;
$$ = (Node *) n;
}
;
/*****************************************************************************
@ -11086,6 +11098,16 @@ AlterSubscriptionStmt:
n->conninfo = $5;
$$ = (Node *) n;
}
| ALTER SUBSCRIPTION name SERVER name
{
AlterSubscriptionStmt *n =
makeNode(AlterSubscriptionStmt);
n->kind = ALTER_SUBSCRIPTION_SERVER;
n->subname = $3;
n->servername = $5;
$$ = (Node *) n;
}
| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
{
AlterSubscriptionStmt *n =

View file

@ -5059,7 +5059,7 @@ maybe_reread_subscription(void)
/* Ensure allocations in permanent context. */
oldctx = MemoryContextSwitchTo(ApplyContext);
newsub = GetSubscription(MyLogicalRepWorker->subid, true);
newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
/*
* Exit if the subscription was removed. This normally should not happen
@ -5201,7 +5201,9 @@ set_wal_receiver_timeout(void)
}
/*
* Callback from subscription syscache invalidation.
* Callback from subscription syscache invalidation. Also needed for server or
* user mapping invalidation, which can change the connection information for
* subscriptions that connect using a server object.
*/
static void
subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
@ -5806,7 +5808,7 @@ InitializeLogRepWorker(void)
*/
LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
AccessShareLock);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
if (!MySubscription)
{
ereport(LOG,
@ -5871,6 +5873,22 @@ InitializeLogRepWorker(void)
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb,
(Datum) 0);
/* Changes to foreign servers may affect subscriptions using SERVER. */
CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
subscription_change_cb,
(Datum) 0);
/* Changes to user mappings may affect subscriptions using SERVER. */
CacheRegisterSyscacheCallback(USERMAPPINGOID,
subscription_change_cb,
(Datum) 0);
/*
* Changes to FDW connection_function may affect subscriptions using
* SERVER.
*/
CacheRegisterSyscacheCallback(FOREIGNDATAWRAPPEROID,
subscription_change_cb,
(Datum) 0);
CacheRegisterSyscacheCallback(AUTHOID,
subscription_change_cb,

View file

@ -5182,6 +5182,7 @@ getSubscriptions(Archive *fout)
int i_subdisableonerr;
int i_subpasswordrequired;
int i_subrunasowner;
int i_subservername;
int i_subconninfo;
int i_subslotname;
int i_subsynccommit;
@ -5286,14 +5287,24 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 190000)
appendPQExpBufferStr(query,
" s.subwalrcvtimeout\n");
" s.subwalrcvtimeout,\n");
else
appendPQExpBufferStr(query,
" '-1' AS subwalrcvtimeout\n");
" '-1' AS subwalrcvtimeout,\n");
if (fout->remoteVersion >= 190000)
appendPQExpBufferStr(query, " fs.srvname AS subservername\n");
else
appendPQExpBufferStr(query, " NULL AS subservername\n");
appendPQExpBufferStr(query,
"FROM pg_subscription s\n");
if (fout->remoteVersion >= 190000)
appendPQExpBufferStr(query,
"LEFT JOIN pg_catalog.pg_foreign_server fs \n"
" ON fs.oid = s.subserver \n");
if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
appendPQExpBufferStr(query,
"LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
@ -5325,6 +5336,7 @@ getSubscriptions(Archive *fout)
i_subfailover = PQfnumber(res, "subfailover");
i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples");
i_submaxretention = PQfnumber(res, "submaxretention");
i_subservername = PQfnumber(res, "subservername");
i_subconninfo = PQfnumber(res, "subconninfo");
i_subslotname = PQfnumber(res, "subslotname");
i_subsynccommit = PQfnumber(res, "subsynccommit");
@ -5347,6 +5359,10 @@ getSubscriptions(Archive *fout)
subinfo[i].subenabled =
(strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
if (PQgetisnull(res, i, i_subservername))
subinfo[i].subservername = NULL;
else
subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
subinfo[i].subbinary =
(strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0);
subinfo[i].substream = *(PQgetvalue(res, i, i_substream));
@ -5363,8 +5379,11 @@ getSubscriptions(Archive *fout)
(strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0);
subinfo[i].submaxretention =
atoi(PQgetvalue(res, i, i_submaxretention));
subinfo[i].subconninfo =
pg_strdup(PQgetvalue(res, i, i_subconninfo));
if (PQgetisnull(res, i, i_subconninfo))
subinfo[i].subconninfo = NULL;
else
subinfo[i].subconninfo =
pg_strdup(PQgetvalue(res, i, i_subconninfo));
if (PQgetisnull(res, i, i_subslotname))
subinfo[i].subslotname = NULL;
else
@ -5575,9 +5594,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
qsubname);
appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
qsubname);
appendStringLiteralAH(query, subinfo->subconninfo, fout);
if (subinfo->subservername)
{
appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
}
else
{
appendPQExpBuffer(query, "CONNECTION ");
appendStringLiteralAH(query, subinfo->subconninfo, fout);
}
/* Build list of quoted publications and append them to query. */
if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))

View file

@ -720,6 +720,7 @@ typedef struct _SubscriptionInfo
bool subfailover;
bool subretaindeadtuples;
int submaxretention;
char *subservername;
char *subconninfo;
char *subslotname;
char *subsynccommit;

View file

@ -6895,7 +6895,7 @@ describeSubscriptions(const char *pattern, bool verbose)
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
false, false, false, false, false, false, false, false, false, false,
false, false, false, false, false, false};
false, false, false, false, false, false, false};
if (pset.sversion < 100000)
{
@ -6965,6 +6965,10 @@ describeSubscriptions(const char *pattern, bool verbose)
gettext_noop("Failover"));
if (pset.sversion >= 190000)
{
appendPQExpBuffer(&buf,
", (select srvname from pg_foreign_server where oid=subserver) AS \"%s\"\n",
gettext_noop("Server"));
appendPQExpBuffer(&buf,
", subretaindeadtuples AS \"%s\"\n",
gettext_noop("Retain dead tuples"));

View file

@ -2332,7 +2332,7 @@ match_previous_words(int pattern_id,
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
"RENAME TO", "REFRESH PUBLICATION", "REFRESH SEQUENCES",
"SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION");
"SERVER", "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION");
/* ALTER SUBSCRIPTION <name> REFRESH */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH"))
COMPLETE_WITH("PUBLICATION", "SEQUENCES");
@ -3870,9 +3870,16 @@ match_previous_words(int pattern_id,
/* CREATE SUBSCRIPTION */
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
COMPLETE_WITH("CONNECTION");
COMPLETE_WITH("SERVER", "CONNECTION");
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "SERVER", MatchAny))
COMPLETE_WITH("PUBLICATION");
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
COMPLETE_WITH("PUBLICATION");
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "SERVER",
MatchAny, "PUBLICATION"))
{
/* complete with nothing here as this refers to remote publications */
}
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
MatchAny, "PUBLICATION"))
{

View file

@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202603061
#define CATALOG_VERSION_NO 202603062
#endif

View file

@ -38,6 +38,9 @@ CATALOG(pg_foreign_data_wrapper,2328,ForeignDataWrapperRelationId)
Oid fdwvalidator BKI_LOOKUP_OPT(pg_proc); /* option validation
* function, or 0 if
* none */
Oid fdwconnection BKI_LOOKUP_OPT(pg_proc); /* connection string
* function, or 0 if
* none */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
aclitem fdwacl[1]; /* access permissions */

View file

@ -92,9 +92,12 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
* exceeded max_retention_duration, when
* defined */
Oid subserver BKI_LOOKUP_OPT(pg_foreign_server); /* If connection uses
* server */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
text subconninfo; /* Set if connecting with connection string */
/* Slot name on publisher */
NameData subslotname BKI_FORCE_NULL;
@ -207,7 +210,8 @@ typedef struct Subscription
#endif /* EXPOSE_TO_CLIENT_CODE */
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
extern Subscription *GetSubscription(Oid subid, bool missing_ok,
bool aclcheck);
extern void FreeSubscription(Subscription *sub);
extern void DisableSubscription(Oid subid);

View file

@ -28,6 +28,7 @@ typedef struct ForeignDataWrapper
char *fdwname; /* Name of the FDW */
Oid fdwhandler; /* Oid of handler function, or 0 */
Oid fdwvalidator; /* Oid of validator function, or 0 */
Oid fdwconnection; /* Oid of connection string function, or 0 */
List *options; /* fdwoptions as DefElem list */
} ForeignDataWrapper;
@ -65,10 +66,12 @@ typedef struct ForeignTable
extern ForeignServer *GetForeignServer(Oid serverid);
extern char *ForeignServerName(Oid serverid);
extern ForeignServer *GetForeignServerExtended(Oid serverid,
bits16 flags);
extern ForeignServer *GetForeignServerByName(const char *srvname,
bool missing_ok);
extern char *ForeignServerConnectionString(Oid userid, Oid serverid);
extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,

View file

@ -4383,6 +4383,7 @@ typedef struct CreateSubscriptionStmt
{
NodeTag type;
char *subname; /* Name of the subscription */
char *servername; /* Server name of publisher */
char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */
@ -4391,6 +4392,7 @@ typedef struct CreateSubscriptionStmt
typedef enum AlterSubscriptionType
{
ALTER_SUBSCRIPTION_OPTIONS,
ALTER_SUBSCRIPTION_SERVER,
ALTER_SUBSCRIPTION_CONNECTION,
ALTER_SUBSCRIPTION_SET_PUBLICATION,
ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@ -4406,6 +4408,7 @@ typedef struct AlterSubscriptionStmt
NodeTag type;
AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
char *subname; /* Name of the subscription */
char *servername; /* Server name of publisher */
char *conninfo; /* Connection string to publisher */
List *publication; /* One or more publication to subscribe to */
List *options; /* List of DefElem nodes */

View file

@ -224,6 +224,7 @@ NOTICE: checking pg_extension {extconfig} => pg_class {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid}
NOTICE: checking pg_foreign_data_wrapper {fdwconnection} => pg_proc {oid}
NOTICE: checking pg_foreign_server {srvowner} => pg_authid {oid}
NOTICE: checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid}
NOTICE: checking pg_user_mapping {umuser} => pg_authid {oid}
@ -269,5 +270,6 @@ NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid}
NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid}
NOTICE: checking pg_subscription {subdbid} => pg_database {oid}
NOTICE: checking pg_subscription {subowner} => pg_authid {oid}
NOTICE: checking pg_subscription {subserver} => pg_foreign_server {oid}
NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid}
NOTICE: checking pg_subscription_rel {srrelid} => pg_class {oid}

View file

@ -1,6 +1,14 @@
--
-- SUBSCRIPTION
--
-- directory paths and dlsuffix are passed to us in environment variables
\getenv libdir PG_LIBDIR
\getenv dlsuffix PG_DLSUFFIX
\set regresslib :libdir '/regress' :dlsuffix
CREATE FUNCTION test_fdw_connection(oid, oid, internal)
RETURNS text
AS :'regresslib', 'test_fdw_connection'
LANGUAGE C;
CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
CREATE ROLE regress_subscription_user2;
CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription;
@ -116,18 +124,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+ regress_testsub4
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
\dRs+ regress_testsub4
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
DROP SUBSCRIPTION regress_testsub3;
@ -140,15 +148,53 @@ ERROR: invalid connection string syntax: invalid connection option "i_dont_exis
-- connecting, so this is reliable and safe)
CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
ERROR: subscription "regress_testsub5" could not connect to the publisher: invalid port number: "-1"
CREATE FOREIGN DATA WRAPPER test_fdw;
CREATE SERVER test_server FOREIGN DATA WRAPPER test_fdw;
GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, need USAGE privileges on server
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
ERROR: permission denied for foreign server test_server
RESET SESSION AUTHORIZATION;
GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, need user mapping
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
ERROR: user mapping not found for user "regress_subscription_user3", server "test_server"
CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret');
-- fail, need CONNECTION clause
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
ERROR: foreign data wrapper "test_fdw" does not support subscription connections
DETAIL: Foreign data wrapper must be defined with CONNECTION specified.
RESET SESSION AUTHORIZATION;
ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection;
SET SESSION AUTHORIZATION regress_subscription_user3;
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
RESET SESSION AUTHORIZATION;
REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, must connect but lacks USAGE on server, as well as user mapping
DROP SUBSCRIPTION regress_testsub6;
ERROR: could not connect to publisher when attempting to drop replication slot "dummy": subscription owner "regress_subscription_user3" does not have permission on foreign server "test_server"
HINT: Use ALTER SUBSCRIPTION ... DISABLE to disable the subscription, and then use ALTER SUBSCRIPTION ... SET (slot_name = NONE) to disassociate it from the slot.
ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub6;
SET SESSION AUTHORIZATION regress_subscription_user;
REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3;
DROP SERVER test_server;
DROP FOREIGN DATA WRAPPER test_fdw;
-- fail - invalid connection string during ALTER
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | test subscription
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | test subscription
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@ -157,10 +203,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
@ -176,10 +222,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
-- ok
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00012345 | test subscription
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00012345 | test subscription
(1 row)
-- ok - with lsn = NONE
@ -188,10 +234,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
ERROR: invalid WAL location (LSN): 0/0
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription
(1 row)
BEGIN;
@ -227,10 +273,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = '80s');
ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = 'foobar');
ERROR: invalid value for parameter "wal_receiver_timeout": "foobar"
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 80s | 0/00000000 | test subscription
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | local | dbname=regress_doesnotexist2 | 80s | 0/00000000 | test subscription
(1 row)
-- rename back to keep the rest simple
@ -259,19 +305,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
DROP SUBSCRIPTION regress_testsub;
@ -283,27 +329,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
-- fail - publication already exists
@ -318,10 +364,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
ERROR: publication "testpub1" is already in subscription "regress_testsub"
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
-- fail - publication used more than once
@ -336,10 +382,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
-- ok - delete publications
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
DROP SUBSCRIPTION regress_testsub;
@ -375,19 +421,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
-- we can alter streaming when two_phase enabled
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -397,10 +443,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -413,18 +459,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -437,10 +483,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
@ -454,19 +500,19 @@ NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabl
WARNING: subscription was created, but is not connected
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 1000 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
-- ok
ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
\dRs+
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
List of subscriptions
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);

View file

@ -729,6 +729,13 @@ test_fdw_handler(PG_FUNCTION_ARGS)
PG_RETURN_NULL();
}
PG_FUNCTION_INFO_V1(test_fdw_connection);
Datum
test_fdw_connection(PG_FUNCTION_ARGS)
{
PG_RETURN_TEXT_P(cstring_to_text("dbname=regress_doesnotexist user=doesnotexist password=secret"));
}
PG_FUNCTION_INFO_V1(is_catalog_text_unique_index_oid);
Datum
is_catalog_text_unique_index_oid(PG_FUNCTION_ARGS)

View file

@ -2,6 +2,17 @@
-- SUBSCRIPTION
--
-- directory paths and dlsuffix are passed to us in environment variables
\getenv libdir PG_LIBDIR
\getenv dlsuffix PG_DLSUFFIX
\set regresslib :libdir '/regress' :dlsuffix
CREATE FUNCTION test_fdw_connection(oid, oid, internal)
RETURNS text
AS :'regresslib', 'test_fdw_connection'
LANGUAGE C;
CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
CREATE ROLE regress_subscription_user2;
CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription;
@ -85,6 +96,50 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'i_dont_exist=param' PUBLICATION
-- connecting, so this is reliable and safe)
CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
CREATE FOREIGN DATA WRAPPER test_fdw;
CREATE SERVER test_server FOREIGN DATA WRAPPER test_fdw;
GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, need USAGE privileges on server
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
RESET SESSION AUTHORIZATION;
GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, need user mapping
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret');
-- fail, need CONNECTION clause
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
RESET SESSION AUTHORIZATION;
ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection;
SET SESSION AUTHORIZATION regress_subscription_user3;
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
RESET SESSION AUTHORIZATION;
REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3;
SET SESSION AUTHORIZATION regress_subscription_user3;
-- fail, must connect but lacks USAGE on server, as well as user mapping
DROP SUBSCRIPTION regress_testsub6;
ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
DROP SUBSCRIPTION regress_testsub6;
SET SESSION AUTHORIZATION regress_subscription_user;
REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3;
DROP SERVER test_server;
DROP FOREIGN DATA WRAPPER test_fdw;
-- fail - invalid connection string during ALTER
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';