mirror of
https://github.com/postgres/postgres.git
synced 2026-03-22 02:20:53 -04:00
CREATE SUBSCRIPTION ... SERVER.
Allow CREATE SUBSCRIPTION to accept a foreign server using the SERVER
clause instead of a raw connection string using the CONNECTION clause.
* Enables a user with sufficient privileges to create a subscription
using a foreign server by name without specifying the connection
details.
* Integrates with user mappings (and other FDW infrastructure) using
the subscription owner.
* Provides a layer of indirection to manage multiple subscriptions
to the same remote server more easily.
Also add CREATE FOREIGN DATA WRAPPER ... CONNECTION clause to specify
a connection_function. To be eligible for a subscription, the foreign
server's foreign data wrapper must specify a connection_function.
Add connection_function support to postgres_fdw, and bump postgres_fdw
version to 1.3.
Bump catversion.
Reviewed-by: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Reviewed-by: Shlok Kyal <shlok.kyal.oss@gmail.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/61831790a0a937038f78ce09f8dd4cef7de7456a.camel@j-davis.com
This commit is contained in:
parent
868825aaeb
commit
8185bb5347
36 changed files with 1075 additions and 255 deletions
|
|
@ -14,7 +14,7 @@ PG_CPPFLAGS = -I$(libpq_srcdir)
|
|||
SHLIB_LINK_INTERNAL = $(libpq)
|
||||
|
||||
EXTENSION = postgres_fdw
|
||||
DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql
|
||||
DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql postgres_fdw--1.1--1.2.sql postgres_fdw--1.2--1.3.sql
|
||||
|
||||
REGRESS = postgres_fdw query_cancel
|
||||
ISOLATION = eval_plan_qual
|
||||
|
|
|
|||
|
|
@ -132,6 +132,7 @@ PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
|
|||
PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
|
||||
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
|
||||
PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
|
||||
PG_FUNCTION_INFO_V1(postgres_fdw_connection);
|
||||
|
||||
/* prototypes of private functions */
|
||||
static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
|
||||
|
|
@ -476,6 +477,142 @@ pgfdw_security_check(const char **keywords, const char **values, UserMapping *us
|
|||
errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Construct connection params from generic options of ForeignServer and
|
||||
* UserMapping. (Some of them might not be libpq options, in which case we'll
|
||||
* just waste a few array slots.)
|
||||
*/
|
||||
static void
|
||||
construct_connection_params(ForeignServer *server, UserMapping *user,
|
||||
const char ***p_keywords, const char ***p_values,
|
||||
char **p_appname)
|
||||
{
|
||||
const char **keywords;
|
||||
const char **values;
|
||||
char *appname = NULL;
|
||||
int n;
|
||||
|
||||
/*
|
||||
* Add 4 extra slots for application_name, fallback_application_name,
|
||||
* client_encoding, end marker, and 3 extra slots for scram keys and
|
||||
* required scram pass-through options.
|
||||
*/
|
||||
n = list_length(server->options) + list_length(user->options) + 4 + 3;
|
||||
keywords = (const char **) palloc(n * sizeof(char *));
|
||||
values = (const char **) palloc(n * sizeof(char *));
|
||||
|
||||
n = 0;
|
||||
n += ExtractConnectionOptions(server->options,
|
||||
keywords + n, values + n);
|
||||
n += ExtractConnectionOptions(user->options,
|
||||
keywords + n, values + n);
|
||||
|
||||
/*
|
||||
* Use pgfdw_application_name as application_name if set.
|
||||
*
|
||||
* PQconnectdbParams() processes the parameter arrays from start to end.
|
||||
* If any key word is repeated, the last value is used. Therefore note
|
||||
* that pgfdw_application_name must be added to the arrays after options
|
||||
* of ForeignServer are, so that it can override application_name set in
|
||||
* ForeignServer.
|
||||
*/
|
||||
if (pgfdw_application_name && *pgfdw_application_name != '\0')
|
||||
{
|
||||
keywords[n] = "application_name";
|
||||
values[n] = pgfdw_application_name;
|
||||
n++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Search the parameter arrays to find application_name setting, and
|
||||
* replace escape sequences in it with status information if found. The
|
||||
* arrays are searched backwards because the last value is used if
|
||||
* application_name is repeatedly set.
|
||||
*/
|
||||
for (int i = n - 1; i >= 0; i--)
|
||||
{
|
||||
if (strcmp(keywords[i], "application_name") == 0 &&
|
||||
*(values[i]) != '\0')
|
||||
{
|
||||
/*
|
||||
* Use this application_name setting if it's not empty string even
|
||||
* after any escape sequences in it are replaced.
|
||||
*/
|
||||
appname = process_pgfdw_appname(values[i]);
|
||||
if (appname[0] != '\0')
|
||||
{
|
||||
values[i] = appname;
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* This empty application_name is not used, so we set values[i] to
|
||||
* NULL and keep searching the array to find the next one.
|
||||
*/
|
||||
values[i] = NULL;
|
||||
pfree(appname);
|
||||
appname = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
*p_appname = appname;
|
||||
|
||||
/* Use "postgres_fdw" as fallback_application_name */
|
||||
keywords[n] = "fallback_application_name";
|
||||
values[n] = "postgres_fdw";
|
||||
n++;
|
||||
|
||||
/* Set client_encoding so that libpq can convert encoding properly. */
|
||||
keywords[n] = "client_encoding";
|
||||
values[n] = GetDatabaseEncodingName();
|
||||
n++;
|
||||
|
||||
/* Add required SCRAM pass-through connection options if it's enabled. */
|
||||
if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
|
||||
{
|
||||
int len;
|
||||
int encoded_len;
|
||||
|
||||
keywords[n] = "scram_client_key";
|
||||
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
|
||||
/* don't forget the zero-terminator */
|
||||
values[n] = palloc0(len + 1);
|
||||
encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
|
||||
sizeof(MyProcPort->scram_ClientKey),
|
||||
(char *) values[n], len);
|
||||
if (encoded_len < 0)
|
||||
elog(ERROR, "could not encode SCRAM client key");
|
||||
n++;
|
||||
|
||||
keywords[n] = "scram_server_key";
|
||||
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
|
||||
/* don't forget the zero-terminator */
|
||||
values[n] = palloc0(len + 1);
|
||||
encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
|
||||
sizeof(MyProcPort->scram_ServerKey),
|
||||
(char *) values[n], len);
|
||||
if (encoded_len < 0)
|
||||
elog(ERROR, "could not encode SCRAM server key");
|
||||
n++;
|
||||
|
||||
/*
|
||||
* Require scram-sha-256 to ensure that no other auth method is used
|
||||
* when connecting with foreign server.
|
||||
*/
|
||||
keywords[n] = "require_auth";
|
||||
values[n] = "scram-sha-256";
|
||||
n++;
|
||||
}
|
||||
|
||||
keywords[n] = values[n] = NULL;
|
||||
|
||||
/* Verify the set of connection parameters. */
|
||||
check_conn_params(keywords, values, user);
|
||||
|
||||
*p_keywords = keywords;
|
||||
*p_values = values;
|
||||
}
|
||||
|
||||
/*
|
||||
* Connect to remote server using specified server and user mapping properties.
|
||||
*/
|
||||
|
|
@ -491,127 +628,9 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
|
|||
{
|
||||
const char **keywords;
|
||||
const char **values;
|
||||
char *appname = NULL;
|
||||
int n;
|
||||
char *appname;
|
||||
|
||||
/*
|
||||
* Construct connection params from generic options of ForeignServer
|
||||
* and UserMapping. (Some of them might not be libpq options, in
|
||||
* which case we'll just waste a few array slots.) Add 4 extra slots
|
||||
* for application_name, fallback_application_name, client_encoding,
|
||||
* end marker, and 3 extra slots for scram keys and required scram
|
||||
* pass-through options.
|
||||
*/
|
||||
n = list_length(server->options) + list_length(user->options) + 4 + 3;
|
||||
keywords = (const char **) palloc(n * sizeof(char *));
|
||||
values = (const char **) palloc(n * sizeof(char *));
|
||||
|
||||
n = 0;
|
||||
n += ExtractConnectionOptions(server->options,
|
||||
keywords + n, values + n);
|
||||
n += ExtractConnectionOptions(user->options,
|
||||
keywords + n, values + n);
|
||||
|
||||
/*
|
||||
* Use pgfdw_application_name as application_name if set.
|
||||
*
|
||||
* PQconnectdbParams() processes the parameter arrays from start to
|
||||
* end. If any key word is repeated, the last value is used. Therefore
|
||||
* note that pgfdw_application_name must be added to the arrays after
|
||||
* options of ForeignServer are, so that it can override
|
||||
* application_name set in ForeignServer.
|
||||
*/
|
||||
if (pgfdw_application_name && *pgfdw_application_name != '\0')
|
||||
{
|
||||
keywords[n] = "application_name";
|
||||
values[n] = pgfdw_application_name;
|
||||
n++;
|
||||
}
|
||||
|
||||
/*
|
||||
* Search the parameter arrays to find application_name setting, and
|
||||
* replace escape sequences in it with status information if found.
|
||||
* The arrays are searched backwards because the last value is used if
|
||||
* application_name is repeatedly set.
|
||||
*/
|
||||
for (int i = n - 1; i >= 0; i--)
|
||||
{
|
||||
if (strcmp(keywords[i], "application_name") == 0 &&
|
||||
*(values[i]) != '\0')
|
||||
{
|
||||
/*
|
||||
* Use this application_name setting if it's not empty string
|
||||
* even after any escape sequences in it are replaced.
|
||||
*/
|
||||
appname = process_pgfdw_appname(values[i]);
|
||||
if (appname[0] != '\0')
|
||||
{
|
||||
values[i] = appname;
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* This empty application_name is not used, so we set
|
||||
* values[i] to NULL and keep searching the array to find the
|
||||
* next one.
|
||||
*/
|
||||
values[i] = NULL;
|
||||
pfree(appname);
|
||||
appname = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/* Use "postgres_fdw" as fallback_application_name */
|
||||
keywords[n] = "fallback_application_name";
|
||||
values[n] = "postgres_fdw";
|
||||
n++;
|
||||
|
||||
/* Set client_encoding so that libpq can convert encoding properly. */
|
||||
keywords[n] = "client_encoding";
|
||||
values[n] = GetDatabaseEncodingName();
|
||||
n++;
|
||||
|
||||
/* Add required SCRAM pass-through connection options if it's enabled. */
|
||||
if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
|
||||
{
|
||||
int len;
|
||||
int encoded_len;
|
||||
|
||||
keywords[n] = "scram_client_key";
|
||||
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
|
||||
/* don't forget the zero-terminator */
|
||||
values[n] = palloc0(len + 1);
|
||||
encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
|
||||
sizeof(MyProcPort->scram_ClientKey),
|
||||
(char *) values[n], len);
|
||||
if (encoded_len < 0)
|
||||
elog(ERROR, "could not encode SCRAM client key");
|
||||
n++;
|
||||
|
||||
keywords[n] = "scram_server_key";
|
||||
len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
|
||||
/* don't forget the zero-terminator */
|
||||
values[n] = palloc0(len + 1);
|
||||
encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
|
||||
sizeof(MyProcPort->scram_ServerKey),
|
||||
(char *) values[n], len);
|
||||
if (encoded_len < 0)
|
||||
elog(ERROR, "could not encode SCRAM server key");
|
||||
n++;
|
||||
|
||||
/*
|
||||
* Require scram-sha-256 to ensure that no other auth method is
|
||||
* used when connecting with foreign server.
|
||||
*/
|
||||
keywords[n] = "require_auth";
|
||||
values[n] = "scram-sha-256";
|
||||
n++;
|
||||
}
|
||||
|
||||
keywords[n] = values[n] = NULL;
|
||||
|
||||
/* Verify the set of connection parameters. */
|
||||
check_conn_params(keywords, values, user);
|
||||
construct_connection_params(server, user, &keywords, &values, &appname);
|
||||
|
||||
/* first time, allocate or get the custom wait event */
|
||||
if (pgfdw_we_connect == 0)
|
||||
|
|
@ -2310,6 +2329,56 @@ postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Values in connection strings must be enclosed in single quotes. Single
|
||||
* quotes and backslashes must be escaped with backslash. NB: these rules are
|
||||
* different from the rules for escaping a SQL literal.
|
||||
*/
|
||||
static void
|
||||
appendEscapedValue(StringInfo str, const char *val)
|
||||
{
|
||||
appendStringInfoChar(str, '\'');
|
||||
for (int i = 0; val[i] != '\0'; i++)
|
||||
{
|
||||
if (val[i] == '\\' || val[i] == '\'')
|
||||
appendStringInfoChar(str, '\\');
|
||||
appendStringInfoChar(str, val[i]);
|
||||
}
|
||||
appendStringInfoChar(str, '\'');
|
||||
}
|
||||
|
||||
Datum
|
||||
postgres_fdw_connection(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid userid = PG_GETARG_OID(0);
|
||||
Oid serverid = PG_GETARG_OID(1);
|
||||
ForeignServer *server = GetForeignServer(serverid);
|
||||
UserMapping *user = GetUserMapping(userid, serverid);
|
||||
StringInfoData str;
|
||||
const char **keywords;
|
||||
const char **values;
|
||||
char *appname;
|
||||
char *sep = "";
|
||||
|
||||
construct_connection_params(server, user, &keywords, &values, &appname);
|
||||
|
||||
initStringInfo(&str);
|
||||
for (int i = 0; keywords[i] != NULL; i++)
|
||||
{
|
||||
if (values[i] == NULL)
|
||||
continue;
|
||||
appendStringInfo(&str, "%s%s = ", sep, keywords[i]);
|
||||
appendEscapedValue(&str, values[i]);
|
||||
sep = " ";
|
||||
}
|
||||
|
||||
if (appname != NULL)
|
||||
pfree(appname);
|
||||
pfree(keywords);
|
||||
pfree(values);
|
||||
PG_RETURN_TEXT_P(cstring_to_text(str.data));
|
||||
}
|
||||
|
||||
/*
|
||||
* List active foreign server connections.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -255,6 +255,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
|
|||
ANALYZE ft1;
|
||||
ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
|
||||
-- ===================================================================
|
||||
-- test subscription
|
||||
-- ===================================================================
|
||||
CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
|
||||
PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
|
||||
WARNING: subscription was created, but is not connected
|
||||
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
|
||||
DROP SUBSCRIPTION regress_pgfdw_subscription;
|
||||
-- ===================================================================
|
||||
-- test error case for create publication on foreign table
|
||||
-- ===================================================================
|
||||
CREATE PUBLICATION testpub_ftbl FOR TABLE ft1; -- should fail
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ install_data(
|
|||
'postgres_fdw--1.0.sql',
|
||||
'postgres_fdw--1.0--1.1.sql',
|
||||
'postgres_fdw--1.1--1.2.sql',
|
||||
'postgres_fdw--1.2--1.3.sql',
|
||||
kwargs: contrib_data_args,
|
||||
)
|
||||
|
||||
|
|
@ -50,6 +51,7 @@ tests += {
|
|||
'tap': {
|
||||
'tests': [
|
||||
't/001_auth_scram.pl',
|
||||
't/010_subscription.pl',
|
||||
],
|
||||
},
|
||||
}
|
||||
|
|
|
|||
12
contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql
Normal file
12
contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql
Normal file
|
|
@ -0,0 +1,12 @@
|
|||
/* contrib/postgres_fdw/postgres_fdw--1.2--1.3.sql */
|
||||
|
||||
-- complain if script is sourced in psql, rather than via ALTER EXTENSION
|
||||
\echo Use "ALTER EXTENSION postgres_fdw UPDATE TO '1.3'" to load this file. \quit
|
||||
|
||||
-- takes internal parameter to prevent calling from SQL
|
||||
CREATE FUNCTION postgres_fdw_connection(oid, oid, internal)
|
||||
RETURNS text
|
||||
AS 'MODULE_PATHNAME'
|
||||
LANGUAGE C STRICT PARALLEL RESTRICTED;
|
||||
|
||||
ALTER FOREIGN DATA WRAPPER postgres_fdw CONNECTION postgres_fdw_connection;
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
# postgres_fdw extension
|
||||
comment = 'foreign-data wrapper for remote PostgreSQL servers'
|
||||
default_version = '1.2'
|
||||
default_version = '1.3'
|
||||
module_pathname = '$libdir/postgres_fdw'
|
||||
relocatable = true
|
||||
|
|
|
|||
|
|
@ -244,6 +244,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
|
|||
ANALYZE ft1;
|
||||
ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
|
||||
|
||||
-- ===================================================================
|
||||
-- test subscription
|
||||
-- ===================================================================
|
||||
CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
|
||||
PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
|
||||
DROP SUBSCRIPTION regress_pgfdw_subscription;
|
||||
|
||||
-- ===================================================================
|
||||
-- test error case for create publication on foreign table
|
||||
-- ===================================================================
|
||||
|
|
|
|||
71
contrib/postgres_fdw/t/010_subscription.pl
Normal file
71
contrib/postgres_fdw/t/010_subscription.pl
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
|
||||
# Copyright (c) 2021-2026, PostgreSQL Global Development Group
|
||||
|
||||
# Basic logical replication test
|
||||
use strict;
|
||||
use warnings FATAL => 'all';
|
||||
use PostgreSQL::Test::Cluster;
|
||||
use PostgreSQL::Test::Utils;
|
||||
use Test::More;
|
||||
|
||||
# Initialize publisher node
|
||||
my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
|
||||
$node_publisher->init(allows_streaming => 'logical');
|
||||
$node_publisher->start;
|
||||
|
||||
# Create subscriber node
|
||||
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
|
||||
$node_subscriber->init;
|
||||
$node_subscriber->start;
|
||||
|
||||
# Create some preexisting content on publisher
|
||||
$node_publisher->safe_psql('postgres',
|
||||
"CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a");
|
||||
|
||||
# Replicate the changes without columns
|
||||
$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()");
|
||||
$node_publisher->safe_psql('postgres',
|
||||
"INSERT INTO tab_no_col default VALUES");
|
||||
|
||||
# Setup structure on subscriber
|
||||
$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw");
|
||||
$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)");
|
||||
|
||||
# Setup logical replication
|
||||
my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
|
||||
$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins");
|
||||
|
||||
my $publisher_host = $node_publisher->host;
|
||||
my $publisher_port = $node_publisher->port;
|
||||
$node_subscriber->safe_psql('postgres',
|
||||
"CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
|
||||
);
|
||||
|
||||
$node_subscriber->safe_psql('postgres',
|
||||
"CREATE USER MAPPING FOR PUBLIC SERVER tap_server"
|
||||
);
|
||||
|
||||
$node_subscriber->safe_psql('postgres',
|
||||
"CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')"
|
||||
);
|
||||
$node_subscriber->safe_psql('postgres',
|
||||
"CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)"
|
||||
);
|
||||
|
||||
# Wait for initial table sync to finish
|
||||
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
|
||||
|
||||
my $result =
|
||||
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
|
||||
is($result, qq(1002), 'check that initial data was copied to subscriber');
|
||||
|
||||
$node_publisher->safe_psql('postgres',
|
||||
"INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a");
|
||||
|
||||
$node_publisher->wait_for_catchup('tap_sub');
|
||||
|
||||
$result =
|
||||
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
|
||||
is($result, qq(1050), 'check that inserted data was copied to subscriber');
|
||||
|
||||
done_testing();
|
||||
|
|
@ -2577,7 +2577,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
|
|||
<para>
|
||||
To create a subscription, the user must have the privileges of
|
||||
the <literal>pg_create_subscription</literal> role, as well as
|
||||
<literal>CREATE</literal> privileges on the database.
|
||||
<literal>CREATE</literal> privileges on the database. If
|
||||
<literal>SERVER</literal> is specified, the user also must have
|
||||
<literal>USAGE</literal> privileges on the server.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
|
|
|
|||
|
|
@ -1049,6 +1049,32 @@ postgres=# SELECT postgres_fdw_disconnect_all();
|
|||
</para>
|
||||
</sect2>
|
||||
|
||||
<sect2 id="postgres-fdw-server-subscription">
|
||||
<title>Subscription Management</title>
|
||||
|
||||
<para>
|
||||
<filename>postgres_fdw</filename> supports subscription connections using
|
||||
the same options described in <xref
|
||||
linkend="postgres-fdw-options-connection"/>.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
For example, assuming the remote server <literal>foreign-host</literal> has
|
||||
a publication <literal>testpub</literal>:
|
||||
<programlisting>
|
||||
CREATE SERVER subscription_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host 'foreign-host', dbname 'foreign_db');
|
||||
CREATE USER MAPPING FOR local_user SERVER subscription_server OPTIONS (user 'foreign_user', password 'password');
|
||||
CREATE SUBSCRIPTION my_subscription SERVER subscription_server PUBLICATION testpub;
|
||||
</programlisting>
|
||||
</para>
|
||||
|
||||
<para>
|
||||
To create a subscription, the user must be a member of the <xref
|
||||
linkend="predefined-role-pg-create-subscription"/> role and have
|
||||
<literal>USAGE</literal> privileges on the server.
|
||||
</para>
|
||||
</sect2>
|
||||
|
||||
<sect2 id="postgres-fdw-transaction-management">
|
||||
<title>Transaction Management</title>
|
||||
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ PostgreSQL documentation
|
|||
ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
|
||||
[ HANDLER <replaceable class="parameter">handler_function</replaceable> | NO HANDLER ]
|
||||
[ VALIDATOR <replaceable class="parameter">validator_function</replaceable> | NO VALIDATOR ]
|
||||
[ CONNECTION <replaceable class="parameter">connection_function</replaceable> | NO CONNECTION ]
|
||||
[ OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ]) ]
|
||||
ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
|
||||
ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
|
||||
|
|
@ -112,6 +113,25 @@ ALTER FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable> REN
|
|||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><literal>CONNECTION <replaceable class="parameter">connection_function</replaceable></literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
Specifies a new connection function for the foreign-data wrapper.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><literal>NO CONNECTION</literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
This is used to specify that the foreign-data wrapper should no
|
||||
longer have a connection function.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><literal>OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ] )</literal></term>
|
||||
<listitem>
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ PostgreSQL documentation
|
|||
|
||||
<refsynopsisdiv>
|
||||
<synopsis>
|
||||
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
|
||||
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
|
||||
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
|
||||
ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
|
||||
|
|
@ -102,13 +103,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
|
|||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="sql-altersubscription-params-server">
|
||||
<term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
This clause replaces the foreign server or connection string originally
|
||||
set by <xref linkend="sql-createsubscription"/> with the foreign server
|
||||
<replaceable>servername</replaceable>.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="sql-altersubscription-params-connection">
|
||||
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
This clause replaces the connection string originally set by
|
||||
<xref linkend="sql-createsubscription"/>. See there for more
|
||||
information.
|
||||
This clause replaces the foreign server or connection string originally
|
||||
set by <xref linkend="sql-createsubscription"/> with the connection
|
||||
string <replaceable>conninfo</replaceable>.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ PostgreSQL documentation
|
|||
CREATE FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
|
||||
[ HANDLER <replaceable class="parameter">handler_function</replaceable> | NO HANDLER ]
|
||||
[ VALIDATOR <replaceable class="parameter">validator_function</replaceable> | NO VALIDATOR ]
|
||||
[ CONNECTION <replaceable class="parameter">connection_function</replaceable> | NO CONNECTION ]
|
||||
[ OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] ) ]
|
||||
</synopsis>
|
||||
</refsynopsisdiv>
|
||||
|
|
@ -99,6 +100,25 @@ CREATE FOREIGN DATA WRAPPER <replaceable class="parameter">name</replaceable>
|
|||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><literal>CONNECTION <replaceable class="parameter">connection_function</replaceable></literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
<replaceable class="parameter">connection_function</replaceable> is the
|
||||
name of a previously registered function that will be called to generate
|
||||
the postgres connection string when a foreign server is used as part of
|
||||
<xref linkend="sql-createsubscription"/>. If no connection function or
|
||||
<literal>NO CONNECTION</literal> is specified, then servers using this
|
||||
foreign data wrapper cannot be used for <literal>CREATE
|
||||
SUBSCRIPTION</literal>. The connection function must take three
|
||||
arguments: one of type <type>oid</type> for the user, one of type
|
||||
<type>oid</type> for the server, and an unused third argument of type
|
||||
<type>internal</type> (which prevents calling the function in other
|
||||
contexts).
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry>
|
||||
<term><literal>OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] )</literal></term>
|
||||
<listitem>
|
||||
|
|
|
|||
|
|
@ -42,6 +42,13 @@ CREATE SERVER [ IF NOT EXISTS ] <replaceable class="parameter">server_name</repl
|
|||
means of user mappings.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
If the foreign data wrapper <replaceable>fdw_name</replaceable> is
|
||||
specified with a <literal>CONNECTION</literal> clause, then <xref
|
||||
linkend="sql-createsubscription"/> may use this foreign server for
|
||||
connection information.
|
||||
</para>
|
||||
|
||||
<para>
|
||||
The server name must be unique within the database.
|
||||
</para>
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ PostgreSQL documentation
|
|||
<refsynopsisdiv>
|
||||
<synopsis>
|
||||
CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
|
||||
CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
|
||||
{ SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
|
||||
PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
|
||||
[ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
|
||||
</synopsis>
|
||||
|
|
@ -77,6 +77,20 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
|
|||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="sql-createsubscription-params-server">
|
||||
<term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
|
||||
<listitem>
|
||||
<para>
|
||||
A foreign server to use for the connection. The server's foreign data
|
||||
wrapper must have a <replaceable>connection_function</replaceable>
|
||||
registered, and a user mapping for the subscription owner on the server
|
||||
must exist. Additionally, the subscription owner must have
|
||||
<literal>USAGE</literal> privileges on
|
||||
<replaceable>servername</replaceable>.
|
||||
</para>
|
||||
</listitem>
|
||||
</varlistentry>
|
||||
|
||||
<varlistentry id="sql-createsubscription-params-connection">
|
||||
<term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
|
||||
<listitem>
|
||||
|
|
|
|||
|
|
@ -895,6 +895,17 @@ findDependentObjects(const ObjectAddress *object,
|
|||
object->objectSubId == 0)
|
||||
continue;
|
||||
|
||||
/*
|
||||
* Check that the dependent object is not in a shared catalog, which
|
||||
* is not supported by doDeletion().
|
||||
*/
|
||||
if (IsSharedRelation(otherObject.classId))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
|
||||
errmsg("cannot drop %s because %s depends on it",
|
||||
getObjectDescription(object, false),
|
||||
getObjectDescription(&otherObject, false))));
|
||||
|
||||
/*
|
||||
* Must lock the dependent object before recursing to it.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -19,11 +19,14 @@
|
|||
#include "access/htup_details.h"
|
||||
#include "access/tableam.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "catalog/pg_foreign_server.h"
|
||||
#include "catalog/pg_subscription.h"
|
||||
#include "catalog/pg_subscription_rel.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "foreign/foreign.h"
|
||||
#include "miscadmin.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "utils/acl.h"
|
||||
#include "utils/array.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
|
|
@ -69,7 +72,7 @@ GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
|
|||
* Fetch the subscription from the syscache.
|
||||
*/
|
||||
Subscription *
|
||||
GetSubscription(Oid subid, bool missing_ok)
|
||||
GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
|
||||
{
|
||||
HeapTuple tup;
|
||||
Subscription *sub;
|
||||
|
|
@ -108,10 +111,35 @@ GetSubscription(Oid subid, bool missing_ok)
|
|||
sub->retentionactive = subform->subretentionactive;
|
||||
|
||||
/* Get conninfo */
|
||||
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
|
||||
tup,
|
||||
Anum_pg_subscription_subconninfo);
|
||||
sub->conninfo = TextDatumGetCString(datum);
|
||||
if (OidIsValid(subform->subserver))
|
||||
{
|
||||
AclResult aclresult;
|
||||
|
||||
/* recheck ACL if requested */
|
||||
if (aclcheck)
|
||||
{
|
||||
aclresult = object_aclcheck(ForeignServerRelationId,
|
||||
subform->subserver,
|
||||
subform->subowner, ACL_USAGE);
|
||||
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
|
||||
GetUserNameFromId(subform->subowner, false),
|
||||
ForeignServerName(subform->subserver))));
|
||||
}
|
||||
|
||||
sub->conninfo = ForeignServerConnectionString(subform->subowner,
|
||||
subform->subserver);
|
||||
}
|
||||
else
|
||||
{
|
||||
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
|
||||
tup,
|
||||
Anum_pg_subscription_subconninfo);
|
||||
sub->conninfo = TextDatumGetCString(datum);
|
||||
}
|
||||
|
||||
/* Get slotname */
|
||||
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
|
||||
|
|
|
|||
|
|
@ -1449,7 +1449,7 @@ GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
|
|||
subbinary, substream, subtwophasestate, subdisableonerr,
|
||||
subpasswordrequired, subrunasowner, subfailover,
|
||||
subretaindeadtuples, submaxretention, subretentionactive,
|
||||
subslotname, subsynccommit, subpublications, suborigin)
|
||||
subserver, subslotname, subsynccommit, subpublications, suborigin)
|
||||
ON pg_subscription TO public;
|
||||
|
||||
CREATE VIEW pg_stat_subscription_stats AS
|
||||
|
|
|
|||
|
|
@ -522,21 +522,53 @@ lookup_fdw_validator_func(DefElem *validator)
|
|||
/* validator's return value is ignored, so we don't check the type */
|
||||
}
|
||||
|
||||
/*
|
||||
* Convert a connection string function name passed from the parser to an Oid.
|
||||
*/
|
||||
static Oid
|
||||
lookup_fdw_connection_func(DefElem *connection)
|
||||
{
|
||||
Oid connectionOid;
|
||||
Oid funcargtypes[3];
|
||||
|
||||
if (connection == NULL || connection->arg == NULL)
|
||||
return InvalidOid;
|
||||
|
||||
/* connection string functions take user oid, server oid */
|
||||
funcargtypes[0] = OIDOID;
|
||||
funcargtypes[1] = OIDOID;
|
||||
funcargtypes[2] = INTERNALOID;
|
||||
|
||||
connectionOid = LookupFuncName((List *) connection->arg, 3, funcargtypes, false);
|
||||
|
||||
/* check that connection string function has correct return type */
|
||||
if (get_func_rettype(connectionOid) != TEXTOID)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("function %s must return type %s",
|
||||
NameListToString((List *) connection->arg), "text")));
|
||||
|
||||
return connectionOid;
|
||||
}
|
||||
|
||||
/*
|
||||
* Process function options of CREATE/ALTER FDW
|
||||
*/
|
||||
static void
|
||||
parse_func_options(ParseState *pstate, List *func_options,
|
||||
bool *handler_given, Oid *fdwhandler,
|
||||
bool *validator_given, Oid *fdwvalidator)
|
||||
bool *validator_given, Oid *fdwvalidator,
|
||||
bool *connection_given, Oid *fdwconnection)
|
||||
{
|
||||
ListCell *cell;
|
||||
|
||||
*handler_given = false;
|
||||
*validator_given = false;
|
||||
*connection_given = false;
|
||||
/* return InvalidOid if not given */
|
||||
*fdwhandler = InvalidOid;
|
||||
*fdwvalidator = InvalidOid;
|
||||
*fdwconnection = InvalidOid;
|
||||
|
||||
foreach(cell, func_options)
|
||||
{
|
||||
|
|
@ -556,6 +588,13 @@ parse_func_options(ParseState *pstate, List *func_options,
|
|||
*validator_given = true;
|
||||
*fdwvalidator = lookup_fdw_validator_func(def);
|
||||
}
|
||||
else if (strcmp(def->defname, "connection") == 0)
|
||||
{
|
||||
if (*connection_given)
|
||||
errorConflictingDefElem(def, pstate);
|
||||
*connection_given = true;
|
||||
*fdwconnection = lookup_fdw_connection_func(def);
|
||||
}
|
||||
else
|
||||
elog(ERROR, "option \"%s\" not recognized",
|
||||
def->defname);
|
||||
|
|
@ -575,8 +614,10 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
|
|||
Oid fdwId;
|
||||
bool handler_given;
|
||||
bool validator_given;
|
||||
bool connection_given;
|
||||
Oid fdwhandler;
|
||||
Oid fdwvalidator;
|
||||
Oid fdwconnection;
|
||||
Datum fdwoptions;
|
||||
Oid ownerId;
|
||||
ObjectAddress myself;
|
||||
|
|
@ -620,10 +661,12 @@ CreateForeignDataWrapper(ParseState *pstate, CreateFdwStmt *stmt)
|
|||
/* Lookup handler and validator functions, if given */
|
||||
parse_func_options(pstate, stmt->func_options,
|
||||
&handler_given, &fdwhandler,
|
||||
&validator_given, &fdwvalidator);
|
||||
&validator_given, &fdwvalidator,
|
||||
&connection_given, &fdwconnection);
|
||||
|
||||
values[Anum_pg_foreign_data_wrapper_fdwhandler - 1] = ObjectIdGetDatum(fdwhandler);
|
||||
values[Anum_pg_foreign_data_wrapper_fdwvalidator - 1] = ObjectIdGetDatum(fdwvalidator);
|
||||
values[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
|
||||
|
||||
nulls[Anum_pg_foreign_data_wrapper_fdwacl - 1] = true;
|
||||
|
||||
|
|
@ -695,8 +738,10 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
|
|||
Datum datum;
|
||||
bool handler_given;
|
||||
bool validator_given;
|
||||
bool connection_given;
|
||||
Oid fdwhandler;
|
||||
Oid fdwvalidator;
|
||||
Oid fdwconnection;
|
||||
ObjectAddress myself;
|
||||
|
||||
rel = table_open(ForeignDataWrapperRelationId, RowExclusiveLock);
|
||||
|
|
@ -726,7 +771,8 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
|
|||
|
||||
parse_func_options(pstate, stmt->func_options,
|
||||
&handler_given, &fdwhandler,
|
||||
&validator_given, &fdwvalidator);
|
||||
&validator_given, &fdwvalidator,
|
||||
&connection_given, &fdwconnection);
|
||||
|
||||
if (handler_given)
|
||||
{
|
||||
|
|
@ -764,6 +810,12 @@ AlterForeignDataWrapper(ParseState *pstate, AlterFdwStmt *stmt)
|
|||
fdwvalidator = fdwForm->fdwvalidator;
|
||||
}
|
||||
|
||||
if (connection_given)
|
||||
{
|
||||
repl_val[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = ObjectIdGetDatum(fdwconnection);
|
||||
repl_repl[Anum_pg_foreign_data_wrapper_fdwconnection - 1] = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* If options specified, validate and update.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -27,13 +27,16 @@
|
|||
#include "catalog/objectaddress.h"
|
||||
#include "catalog/pg_authid_d.h"
|
||||
#include "catalog/pg_database_d.h"
|
||||
#include "catalog/pg_foreign_server.h"
|
||||
#include "catalog/pg_subscription.h"
|
||||
#include "catalog/pg_subscription_rel.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "catalog/pg_user_mapping.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "commands/event_trigger.h"
|
||||
#include "commands/subscriptioncmds.h"
|
||||
#include "executor/executor.h"
|
||||
#include "foreign/foreign.h"
|
||||
#include "miscadmin.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "pgstat.h"
|
||||
|
|
@ -619,6 +622,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
|||
Datum values[Natts_pg_subscription];
|
||||
Oid owner = GetUserId();
|
||||
HeapTuple tup;
|
||||
Oid serverid;
|
||||
char *conninfo;
|
||||
char originname[NAMEDATALEN];
|
||||
List *publications;
|
||||
|
|
@ -730,15 +734,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
|||
if (opts.wal_receiver_timeout == NULL)
|
||||
opts.wal_receiver_timeout = "-1";
|
||||
|
||||
conninfo = stmt->conninfo;
|
||||
publications = stmt->publication;
|
||||
|
||||
/* Load the library providing us libpq calls. */
|
||||
load_file("libpqwalreceiver", false);
|
||||
|
||||
if (stmt->servername)
|
||||
{
|
||||
ForeignServer *server;
|
||||
|
||||
Assert(!stmt->conninfo);
|
||||
conninfo = NULL;
|
||||
|
||||
server = GetForeignServerByName(stmt->servername, false);
|
||||
aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
|
||||
|
||||
/* make sure a user mapping exists */
|
||||
GetUserMapping(owner, server->serverid);
|
||||
|
||||
serverid = server->serverid;
|
||||
conninfo = ForeignServerConnectionString(owner, serverid);
|
||||
}
|
||||
else
|
||||
{
|
||||
Assert(stmt->conninfo);
|
||||
|
||||
serverid = InvalidOid;
|
||||
conninfo = stmt->conninfo;
|
||||
}
|
||||
|
||||
/* Check the connection info string. */
|
||||
walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
|
||||
|
||||
publications = stmt->publication;
|
||||
|
||||
/* Everything ok, form a new tuple. */
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(nulls, false, sizeof(nulls));
|
||||
|
|
@ -768,8 +797,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
|||
Int32GetDatum(opts.maxretention);
|
||||
values[Anum_pg_subscription_subretentionactive - 1] =
|
||||
Int32GetDatum(opts.retaindeadtuples);
|
||||
values[Anum_pg_subscription_subconninfo - 1] =
|
||||
CStringGetTextDatum(conninfo);
|
||||
values[Anum_pg_subscription_subserver - 1] = serverid;
|
||||
if (!OidIsValid(serverid))
|
||||
values[Anum_pg_subscription_subconninfo - 1] =
|
||||
CStringGetTextDatum(conninfo);
|
||||
else
|
||||
nulls[Anum_pg_subscription_subconninfo - 1] = true;
|
||||
if (opts.slot_name)
|
||||
values[Anum_pg_subscription_subslotname - 1] =
|
||||
DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
|
||||
|
|
@ -792,6 +825,18 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
|||
|
||||
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
|
||||
|
||||
ObjectAddressSet(myself, SubscriptionRelationId, subid);
|
||||
|
||||
if (stmt->servername)
|
||||
{
|
||||
ObjectAddress referenced;
|
||||
|
||||
Assert(OidIsValid(serverid));
|
||||
|
||||
ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
|
||||
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
|
||||
}
|
||||
|
||||
/*
|
||||
* A replication origin is currently created for all subscriptions,
|
||||
* including those that only contain sequences or are otherwise empty.
|
||||
|
|
@ -945,8 +990,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
|
|||
if (opts.enabled || opts.retaindeadtuples)
|
||||
ApplyLauncherWakeupAtCommit();
|
||||
|
||||
ObjectAddressSet(myself, SubscriptionRelationId, subid);
|
||||
|
||||
InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
|
||||
|
||||
return myself;
|
||||
|
|
@ -1410,7 +1453,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
|
|||
aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
|
||||
stmt->subname);
|
||||
|
||||
sub = GetSubscription(subid, false);
|
||||
/*
|
||||
* Skip ACL checks on the subscription's foreign server, if any. If
|
||||
* changing the server (or replacing it with a raw connection), then the
|
||||
* old one will be removed anyway. If changing something unrelated,
|
||||
* there's no need to do an additional ACL check here; that will be done
|
||||
* by the subscription worker anyway.
|
||||
*/
|
||||
sub = GetSubscription(subid, false, false);
|
||||
|
||||
retain_dead_tuples = sub->retaindeadtuples;
|
||||
origin = sub->origin;
|
||||
|
|
@ -1435,6 +1485,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
|
|||
memset(nulls, false, sizeof(nulls));
|
||||
memset(replaces, false, sizeof(replaces));
|
||||
|
||||
ObjectAddressSet(myself, SubscriptionRelationId, subid);
|
||||
|
||||
switch (stmt->kind)
|
||||
{
|
||||
case ALTER_SUBSCRIPTION_OPTIONS:
|
||||
|
|
@ -1753,7 +1805,78 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
|
|||
break;
|
||||
}
|
||||
|
||||
case ALTER_SUBSCRIPTION_SERVER:
|
||||
{
|
||||
ForeignServer *new_server;
|
||||
ObjectAddress referenced;
|
||||
AclResult aclresult;
|
||||
char *conninfo;
|
||||
|
||||
/*
|
||||
* Remove what was there before, either another foreign server
|
||||
* or a connection string.
|
||||
*/
|
||||
if (form->subserver)
|
||||
{
|
||||
deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
|
||||
DEPENDENCY_NORMAL,
|
||||
ForeignServerRelationId, form->subserver);
|
||||
}
|
||||
else
|
||||
{
|
||||
nulls[Anum_pg_subscription_subconninfo - 1] = true;
|
||||
replaces[Anum_pg_subscription_subconninfo - 1] = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check that the subscription owner has USAGE privileges on
|
||||
* the server.
|
||||
*/
|
||||
new_server = GetForeignServerByName(stmt->servername, false);
|
||||
aclresult = object_aclcheck(ForeignServerRelationId,
|
||||
new_server->serverid,
|
||||
form->subowner, ACL_USAGE);
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
|
||||
GetUserNameFromId(form->subowner, false),
|
||||
ForeignServerName(new_server->serverid)));
|
||||
|
||||
/* make sure a user mapping exists */
|
||||
GetUserMapping(form->subowner, new_server->serverid);
|
||||
|
||||
conninfo = ForeignServerConnectionString(form->subowner,
|
||||
new_server->serverid);
|
||||
|
||||
/* Load the library providing us libpq calls. */
|
||||
load_file("libpqwalreceiver", false);
|
||||
/* Check the connection info string. */
|
||||
walrcv_check_conninfo(conninfo,
|
||||
sub->passwordrequired && !sub->ownersuperuser);
|
||||
|
||||
values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
|
||||
replaces[Anum_pg_subscription_subserver - 1] = true;
|
||||
|
||||
ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
|
||||
recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
|
||||
|
||||
update_tuple = true;
|
||||
}
|
||||
break;
|
||||
|
||||
case ALTER_SUBSCRIPTION_CONNECTION:
|
||||
/* remove reference to foreign server and dependencies, if present */
|
||||
if (form->subserver)
|
||||
{
|
||||
deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
|
||||
DEPENDENCY_NORMAL,
|
||||
ForeignServerRelationId, form->subserver);
|
||||
|
||||
values[Anum_pg_subscription_subserver - 1] = InvalidOid;
|
||||
replaces[Anum_pg_subscription_subserver - 1] = true;
|
||||
}
|
||||
|
||||
/* Load the library providing us libpq calls. */
|
||||
load_file("libpqwalreceiver", false);
|
||||
/* Check the connection info string. */
|
||||
|
|
@ -2038,8 +2161,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
|
|||
|
||||
table_close(rel, RowExclusiveLock);
|
||||
|
||||
ObjectAddressSet(myself, SubscriptionRelationId, subid);
|
||||
|
||||
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
|
||||
|
||||
/* Wake up related replication workers to handle this change quickly. */
|
||||
|
|
@ -2068,7 +2189,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
|
|||
ListCell *lc;
|
||||
char originname[NAMEDATALEN];
|
||||
char *err = NULL;
|
||||
WalReceiverConn *wrconn;
|
||||
WalReceiverConn *wrconn = NULL;
|
||||
Form_pg_subscription form;
|
||||
List *rstates;
|
||||
bool must_use_password;
|
||||
|
|
@ -2126,9 +2247,35 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
|
|||
subname = pstrdup(NameStr(*DatumGetName(datum)));
|
||||
|
||||
/* Get conninfo */
|
||||
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
|
||||
Anum_pg_subscription_subconninfo);
|
||||
conninfo = TextDatumGetCString(datum);
|
||||
if (OidIsValid(form->subserver))
|
||||
{
|
||||
AclResult aclresult;
|
||||
|
||||
aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
|
||||
form->subowner, ACL_USAGE);
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
{
|
||||
/*
|
||||
* Unable to generate connection string because permissions on the
|
||||
* foreign server have been removed. Follow the same logic as an
|
||||
* unusable subconninfo (which will result in an ERROR later
|
||||
* unless slot_name = NONE).
|
||||
*/
|
||||
err = psprintf(_("subscription owner \"%s\" does not have permission on foreign server \"%s\""),
|
||||
GetUserNameFromId(form->subowner, false),
|
||||
ForeignServerName(form->subserver));
|
||||
conninfo = NULL;
|
||||
}
|
||||
else
|
||||
conninfo = ForeignServerConnectionString(form->subowner,
|
||||
form->subserver);
|
||||
}
|
||||
else
|
||||
{
|
||||
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
|
||||
Anum_pg_subscription_subconninfo);
|
||||
conninfo = TextDatumGetCString(datum);
|
||||
}
|
||||
|
||||
/* Get slotname */
|
||||
datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
|
||||
|
|
@ -2227,6 +2374,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
|
|||
}
|
||||
|
||||
/* Clean up dependencies */
|
||||
deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
|
||||
deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
|
||||
|
||||
/* Remove any associated relation synchronization states. */
|
||||
|
|
@ -2265,8 +2413,10 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
|
|||
*/
|
||||
load_file("libpqwalreceiver", false);
|
||||
|
||||
wrconn = walrcv_connect(conninfo, true, true, must_use_password,
|
||||
subname, &err);
|
||||
if (conninfo)
|
||||
wrconn = walrcv_connect(conninfo, true, true, must_use_password,
|
||||
subname, &err);
|
||||
|
||||
if (wrconn == NULL)
|
||||
{
|
||||
if (!slotname)
|
||||
|
|
@ -2436,6 +2586,27 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
|
|||
aclcheck_error(aclresult, OBJECT_DATABASE,
|
||||
get_database_name(MyDatabaseId));
|
||||
|
||||
/*
|
||||
* If the subscription uses a server, check that the new owner has USAGE
|
||||
* privileges on the server and that a user mapping exists. Note: does not
|
||||
* re-check the resulting connection string.
|
||||
*/
|
||||
if (OidIsValid(form->subserver))
|
||||
{
|
||||
Oid serverid = form->subserver;
|
||||
|
||||
aclresult = object_aclcheck(ForeignServerRelationId, serverid, newOwnerId, ACL_USAGE);
|
||||
if (aclresult != ACLCHECK_OK)
|
||||
ereport(ERROR,
|
||||
errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
|
||||
errmsg("new subscription owner \"%s\" does not have permission on foreign server \"%s\"",
|
||||
GetUserNameFromId(newOwnerId, false),
|
||||
ForeignServerName(serverid)));
|
||||
|
||||
/* make sure a user mapping exists */
|
||||
GetUserMapping(newOwnerId, serverid);
|
||||
}
|
||||
|
||||
form->subowner = newOwnerId;
|
||||
CatalogTupleUpdate(rel, &tup->t_self, tup);
|
||||
|
||||
|
|
|
|||
|
|
@ -72,6 +72,7 @@ GetForeignDataWrapperExtended(Oid fdwid, bits16 flags)
|
|||
fdw->fdwname = pstrdup(NameStr(fdwform->fdwname));
|
||||
fdw->fdwhandler = fdwform->fdwhandler;
|
||||
fdw->fdwvalidator = fdwform->fdwvalidator;
|
||||
fdw->fdwconnection = fdwform->fdwconnection;
|
||||
|
||||
/* Extract the fdwoptions */
|
||||
datum = SysCacheGetAttr(FOREIGNDATAWRAPPEROID,
|
||||
|
|
@ -176,6 +177,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ForeignServerName - get name of foreign server.
|
||||
*/
|
||||
char *
|
||||
ForeignServerName(Oid serverid)
|
||||
{
|
||||
Form_pg_foreign_server serverform;
|
||||
char *servername;
|
||||
HeapTuple tp;
|
||||
|
||||
tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid));
|
||||
|
||||
if (!HeapTupleIsValid(tp))
|
||||
elog(ERROR, "cache lookup failed for foreign server %u", serverid);
|
||||
|
||||
serverform = (Form_pg_foreign_server) GETSTRUCT(tp);
|
||||
|
||||
servername = pstrdup(NameStr(serverform->srvname));
|
||||
|
||||
ReleaseSysCache(tp);
|
||||
|
||||
return servername;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetForeignServerByName - look up the foreign server definition by name.
|
||||
*/
|
||||
|
|
@ -191,6 +217,66 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* Retrieve connection string from server's FDW.
|
||||
*/
|
||||
char *
|
||||
ForeignServerConnectionString(Oid userid, Oid serverid)
|
||||
{
|
||||
MemoryContext tempContext;
|
||||
MemoryContext oldcxt;
|
||||
volatile text *connection_text = NULL;
|
||||
char *result = NULL;
|
||||
|
||||
/*
|
||||
* GetForeignServer, GetForeignDataWrapper, and the connection function
|
||||
* itself all leak memory into CurrentMemoryContext. Switch to a temporary
|
||||
* context for easy cleanup.
|
||||
*/
|
||||
tempContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"FDWConnectionContext",
|
||||
ALLOCSET_SMALL_SIZES);
|
||||
|
||||
oldcxt = MemoryContextSwitchTo(tempContext);
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
ForeignServer *server;
|
||||
ForeignDataWrapper *fdw;
|
||||
Datum connection_datum;
|
||||
|
||||
server = GetForeignServer(serverid);
|
||||
fdw = GetForeignDataWrapper(server->fdwid);
|
||||
|
||||
if (!OidIsValid(fdw->fdwconnection))
|
||||
ereport(ERROR,
|
||||
(errmsg("foreign data wrapper \"%s\" does not support subscription connections",
|
||||
fdw->fdwname),
|
||||
errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
|
||||
|
||||
|
||||
connection_datum = OidFunctionCall3(fdw->fdwconnection,
|
||||
ObjectIdGetDatum(userid),
|
||||
ObjectIdGetDatum(serverid),
|
||||
PointerGetDatum(NULL));
|
||||
|
||||
connection_text = DatumGetTextPP(connection_datum);
|
||||
}
|
||||
PG_FINALLY();
|
||||
{
|
||||
MemoryContextSwitchTo(oldcxt);
|
||||
|
||||
if (connection_text)
|
||||
result = text_to_cstring((text *) connection_text);
|
||||
|
||||
MemoryContextDelete(tempContext);
|
||||
}
|
||||
PG_END_TRY();
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetUserMapping - look up the user mapping.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -5583,6 +5583,8 @@ fdw_option:
|
|||
| NO HANDLER { $$ = makeDefElem("handler", NULL, @1); }
|
||||
| VALIDATOR handler_name { $$ = makeDefElem("validator", (Node *) $2, @1); }
|
||||
| NO VALIDATOR { $$ = makeDefElem("validator", NULL, @1); }
|
||||
| CONNECTION handler_name { $$ = makeDefElem("connection", (Node *) $2, @1); }
|
||||
| NO CONNECTION { $$ = makeDefElem("connection", NULL, @1); }
|
||||
;
|
||||
|
||||
fdw_options:
|
||||
|
|
@ -11057,6 +11059,16 @@ CreateSubscriptionStmt:
|
|||
n->options = $8;
|
||||
$$ = (Node *) n;
|
||||
}
|
||||
| CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
|
||||
{
|
||||
CreateSubscriptionStmt *n =
|
||||
makeNode(CreateSubscriptionStmt);
|
||||
n->subname = $3;
|
||||
n->servername = $5;
|
||||
n->publication = $7;
|
||||
n->options = $8;
|
||||
$$ = (Node *) n;
|
||||
}
|
||||
;
|
||||
|
||||
/*****************************************************************************
|
||||
|
|
@ -11086,6 +11098,16 @@ AlterSubscriptionStmt:
|
|||
n->conninfo = $5;
|
||||
$$ = (Node *) n;
|
||||
}
|
||||
| ALTER SUBSCRIPTION name SERVER name
|
||||
{
|
||||
AlterSubscriptionStmt *n =
|
||||
makeNode(AlterSubscriptionStmt);
|
||||
|
||||
n->kind = ALTER_SUBSCRIPTION_SERVER;
|
||||
n->subname = $3;
|
||||
n->servername = $5;
|
||||
$$ = (Node *) n;
|
||||
}
|
||||
| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
|
||||
{
|
||||
AlterSubscriptionStmt *n =
|
||||
|
|
|
|||
|
|
@ -5059,7 +5059,7 @@ maybe_reread_subscription(void)
|
|||
/* Ensure allocations in permanent context. */
|
||||
oldctx = MemoryContextSwitchTo(ApplyContext);
|
||||
|
||||
newsub = GetSubscription(MyLogicalRepWorker->subid, true);
|
||||
newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
|
||||
|
||||
/*
|
||||
* Exit if the subscription was removed. This normally should not happen
|
||||
|
|
@ -5201,7 +5201,9 @@ set_wal_receiver_timeout(void)
|
|||
}
|
||||
|
||||
/*
|
||||
* Callback from subscription syscache invalidation.
|
||||
* Callback from subscription syscache invalidation. Also needed for server or
|
||||
* user mapping invalidation, which can change the connection information for
|
||||
* subscriptions that connect using a server object.
|
||||
*/
|
||||
static void
|
||||
subscription_change_cb(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
|
||||
|
|
@ -5806,7 +5808,7 @@ InitializeLogRepWorker(void)
|
|||
*/
|
||||
LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
|
||||
AccessShareLock);
|
||||
MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
|
||||
MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
|
||||
if (!MySubscription)
|
||||
{
|
||||
ereport(LOG,
|
||||
|
|
@ -5871,6 +5873,22 @@ InitializeLogRepWorker(void)
|
|||
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
|
||||
subscription_change_cb,
|
||||
(Datum) 0);
|
||||
/* Changes to foreign servers may affect subscriptions using SERVER. */
|
||||
CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
|
||||
subscription_change_cb,
|
||||
(Datum) 0);
|
||||
/* Changes to user mappings may affect subscriptions using SERVER. */
|
||||
CacheRegisterSyscacheCallback(USERMAPPINGOID,
|
||||
subscription_change_cb,
|
||||
(Datum) 0);
|
||||
|
||||
/*
|
||||
* Changes to FDW connection_function may affect subscriptions using
|
||||
* SERVER.
|
||||
*/
|
||||
CacheRegisterSyscacheCallback(FOREIGNDATAWRAPPEROID,
|
||||
subscription_change_cb,
|
||||
(Datum) 0);
|
||||
|
||||
CacheRegisterSyscacheCallback(AUTHOID,
|
||||
subscription_change_cb,
|
||||
|
|
|
|||
|
|
@ -5182,6 +5182,7 @@ getSubscriptions(Archive *fout)
|
|||
int i_subdisableonerr;
|
||||
int i_subpasswordrequired;
|
||||
int i_subrunasowner;
|
||||
int i_subservername;
|
||||
int i_subconninfo;
|
||||
int i_subslotname;
|
||||
int i_subsynccommit;
|
||||
|
|
@ -5286,14 +5287,24 @@ getSubscriptions(Archive *fout)
|
|||
|
||||
if (fout->remoteVersion >= 190000)
|
||||
appendPQExpBufferStr(query,
|
||||
" s.subwalrcvtimeout\n");
|
||||
" s.subwalrcvtimeout,\n");
|
||||
else
|
||||
appendPQExpBufferStr(query,
|
||||
" '-1' AS subwalrcvtimeout\n");
|
||||
" '-1' AS subwalrcvtimeout,\n");
|
||||
|
||||
if (fout->remoteVersion >= 190000)
|
||||
appendPQExpBufferStr(query, " fs.srvname AS subservername\n");
|
||||
else
|
||||
appendPQExpBufferStr(query, " NULL AS subservername\n");
|
||||
|
||||
appendPQExpBufferStr(query,
|
||||
"FROM pg_subscription s\n");
|
||||
|
||||
if (fout->remoteVersion >= 190000)
|
||||
appendPQExpBufferStr(query,
|
||||
"LEFT JOIN pg_catalog.pg_foreign_server fs \n"
|
||||
" ON fs.oid = s.subserver \n");
|
||||
|
||||
if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
|
||||
appendPQExpBufferStr(query,
|
||||
"LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
|
||||
|
|
@ -5325,6 +5336,7 @@ getSubscriptions(Archive *fout)
|
|||
i_subfailover = PQfnumber(res, "subfailover");
|
||||
i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples");
|
||||
i_submaxretention = PQfnumber(res, "submaxretention");
|
||||
i_subservername = PQfnumber(res, "subservername");
|
||||
i_subconninfo = PQfnumber(res, "subconninfo");
|
||||
i_subslotname = PQfnumber(res, "subslotname");
|
||||
i_subsynccommit = PQfnumber(res, "subsynccommit");
|
||||
|
|
@ -5347,6 +5359,10 @@ getSubscriptions(Archive *fout)
|
|||
|
||||
subinfo[i].subenabled =
|
||||
(strcmp(PQgetvalue(res, i, i_subenabled), "t") == 0);
|
||||
if (PQgetisnull(res, i, i_subservername))
|
||||
subinfo[i].subservername = NULL;
|
||||
else
|
||||
subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
|
||||
subinfo[i].subbinary =
|
||||
(strcmp(PQgetvalue(res, i, i_subbinary), "t") == 0);
|
||||
subinfo[i].substream = *(PQgetvalue(res, i, i_substream));
|
||||
|
|
@ -5363,8 +5379,11 @@ getSubscriptions(Archive *fout)
|
|||
(strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0);
|
||||
subinfo[i].submaxretention =
|
||||
atoi(PQgetvalue(res, i, i_submaxretention));
|
||||
subinfo[i].subconninfo =
|
||||
pg_strdup(PQgetvalue(res, i, i_subconninfo));
|
||||
if (PQgetisnull(res, i, i_subconninfo))
|
||||
subinfo[i].subconninfo = NULL;
|
||||
else
|
||||
subinfo[i].subconninfo =
|
||||
pg_strdup(PQgetvalue(res, i, i_subconninfo));
|
||||
if (PQgetisnull(res, i, i_subslotname))
|
||||
subinfo[i].subslotname = NULL;
|
||||
else
|
||||
|
|
@ -5575,9 +5594,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
|
|||
appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
|
||||
qsubname);
|
||||
|
||||
appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
|
||||
appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
|
||||
qsubname);
|
||||
appendStringLiteralAH(query, subinfo->subconninfo, fout);
|
||||
if (subinfo->subservername)
|
||||
{
|
||||
appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
|
||||
}
|
||||
else
|
||||
{
|
||||
appendPQExpBuffer(query, "CONNECTION ");
|
||||
appendStringLiteralAH(query, subinfo->subconninfo, fout);
|
||||
}
|
||||
|
||||
/* Build list of quoted publications and append them to query. */
|
||||
if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
|
||||
|
|
|
|||
|
|
@ -720,6 +720,7 @@ typedef struct _SubscriptionInfo
|
|||
bool subfailover;
|
||||
bool subretaindeadtuples;
|
||||
int submaxretention;
|
||||
char *subservername;
|
||||
char *subconninfo;
|
||||
char *subslotname;
|
||||
char *subsynccommit;
|
||||
|
|
|
|||
|
|
@ -6895,7 +6895,7 @@ describeSubscriptions(const char *pattern, bool verbose)
|
|||
printQueryOpt myopt = pset.popt;
|
||||
static const bool translate_columns[] = {false, false, false, false,
|
||||
false, false, false, false, false, false, false, false, false, false,
|
||||
false, false, false, false, false, false};
|
||||
false, false, false, false, false, false, false};
|
||||
|
||||
if (pset.sversion < 100000)
|
||||
{
|
||||
|
|
@ -6965,6 +6965,10 @@ describeSubscriptions(const char *pattern, bool verbose)
|
|||
gettext_noop("Failover"));
|
||||
if (pset.sversion >= 190000)
|
||||
{
|
||||
appendPQExpBuffer(&buf,
|
||||
", (select srvname from pg_foreign_server where oid=subserver) AS \"%s\"\n",
|
||||
gettext_noop("Server"));
|
||||
|
||||
appendPQExpBuffer(&buf,
|
||||
", subretaindeadtuples AS \"%s\"\n",
|
||||
gettext_noop("Retain dead tuples"));
|
||||
|
|
|
|||
|
|
@ -2332,7 +2332,7 @@ match_previous_words(int pattern_id,
|
|||
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
|
||||
COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
|
||||
"RENAME TO", "REFRESH PUBLICATION", "REFRESH SEQUENCES",
|
||||
"SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION");
|
||||
"SERVER", "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION");
|
||||
/* ALTER SUBSCRIPTION <name> REFRESH */
|
||||
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH"))
|
||||
COMPLETE_WITH("PUBLICATION", "SEQUENCES");
|
||||
|
|
@ -3870,9 +3870,16 @@ match_previous_words(int pattern_id,
|
|||
|
||||
/* CREATE SUBSCRIPTION */
|
||||
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
|
||||
COMPLETE_WITH("CONNECTION");
|
||||
COMPLETE_WITH("SERVER", "CONNECTION");
|
||||
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "SERVER", MatchAny))
|
||||
COMPLETE_WITH("PUBLICATION");
|
||||
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
|
||||
COMPLETE_WITH("PUBLICATION");
|
||||
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "SERVER",
|
||||
MatchAny, "PUBLICATION"))
|
||||
{
|
||||
/* complete with nothing here as this refers to remote publications */
|
||||
}
|
||||
else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
|
||||
MatchAny, "PUBLICATION"))
|
||||
{
|
||||
|
|
|
|||
|
|
@ -57,6 +57,6 @@
|
|||
*/
|
||||
|
||||
/* yyyymmddN */
|
||||
#define CATALOG_VERSION_NO 202603061
|
||||
#define CATALOG_VERSION_NO 202603062
|
||||
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -38,6 +38,9 @@ CATALOG(pg_foreign_data_wrapper,2328,ForeignDataWrapperRelationId)
|
|||
Oid fdwvalidator BKI_LOOKUP_OPT(pg_proc); /* option validation
|
||||
* function, or 0 if
|
||||
* none */
|
||||
Oid fdwconnection BKI_LOOKUP_OPT(pg_proc); /* connection string
|
||||
* function, or 0 if
|
||||
* none */
|
||||
|
||||
#ifdef CATALOG_VARLEN /* variable-length fields start here */
|
||||
aclitem fdwacl[1]; /* access permissions */
|
||||
|
|
|
|||
|
|
@ -92,9 +92,12 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
|
|||
* exceeded max_retention_duration, when
|
||||
* defined */
|
||||
|
||||
Oid subserver BKI_LOOKUP_OPT(pg_foreign_server); /* If connection uses
|
||||
* server */
|
||||
|
||||
#ifdef CATALOG_VARLEN /* variable-length fields start here */
|
||||
/* Connection string to the publisher */
|
||||
text subconninfo BKI_FORCE_NOT_NULL;
|
||||
text subconninfo; /* Set if connecting with connection string */
|
||||
|
||||
/* Slot name on publisher */
|
||||
NameData subslotname BKI_FORCE_NULL;
|
||||
|
|
@ -207,7 +210,8 @@ typedef struct Subscription
|
|||
|
||||
#endif /* EXPOSE_TO_CLIENT_CODE */
|
||||
|
||||
extern Subscription *GetSubscription(Oid subid, bool missing_ok);
|
||||
extern Subscription *GetSubscription(Oid subid, bool missing_ok,
|
||||
bool aclcheck);
|
||||
extern void FreeSubscription(Subscription *sub);
|
||||
extern void DisableSubscription(Oid subid);
|
||||
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ typedef struct ForeignDataWrapper
|
|||
char *fdwname; /* Name of the FDW */
|
||||
Oid fdwhandler; /* Oid of handler function, or 0 */
|
||||
Oid fdwvalidator; /* Oid of validator function, or 0 */
|
||||
Oid fdwconnection; /* Oid of connection string function, or 0 */
|
||||
List *options; /* fdwoptions as DefElem list */
|
||||
} ForeignDataWrapper;
|
||||
|
||||
|
|
@ -65,10 +66,12 @@ typedef struct ForeignTable
|
|||
|
||||
|
||||
extern ForeignServer *GetForeignServer(Oid serverid);
|
||||
extern char *ForeignServerName(Oid serverid);
|
||||
extern ForeignServer *GetForeignServerExtended(Oid serverid,
|
||||
bits16 flags);
|
||||
extern ForeignServer *GetForeignServerByName(const char *srvname,
|
||||
bool missing_ok);
|
||||
extern char *ForeignServerConnectionString(Oid userid, Oid serverid);
|
||||
extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
|
||||
extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
|
||||
extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,
|
||||
|
|
|
|||
|
|
@ -4383,6 +4383,7 @@ typedef struct CreateSubscriptionStmt
|
|||
{
|
||||
NodeTag type;
|
||||
char *subname; /* Name of the subscription */
|
||||
char *servername; /* Server name of publisher */
|
||||
char *conninfo; /* Connection string to publisher */
|
||||
List *publication; /* One or more publication to subscribe to */
|
||||
List *options; /* List of DefElem nodes */
|
||||
|
|
@ -4391,6 +4392,7 @@ typedef struct CreateSubscriptionStmt
|
|||
typedef enum AlterSubscriptionType
|
||||
{
|
||||
ALTER_SUBSCRIPTION_OPTIONS,
|
||||
ALTER_SUBSCRIPTION_SERVER,
|
||||
ALTER_SUBSCRIPTION_CONNECTION,
|
||||
ALTER_SUBSCRIPTION_SET_PUBLICATION,
|
||||
ALTER_SUBSCRIPTION_ADD_PUBLICATION,
|
||||
|
|
@ -4406,6 +4408,7 @@ typedef struct AlterSubscriptionStmt
|
|||
NodeTag type;
|
||||
AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
|
||||
char *subname; /* Name of the subscription */
|
||||
char *servername; /* Server name of publisher */
|
||||
char *conninfo; /* Connection string to publisher */
|
||||
List *publication; /* One or more publication to subscribe to */
|
||||
List *options; /* List of DefElem nodes */
|
||||
|
|
|
|||
|
|
@ -224,6 +224,7 @@ NOTICE: checking pg_extension {extconfig} => pg_class {oid}
|
|||
NOTICE: checking pg_foreign_data_wrapper {fdwowner} => pg_authid {oid}
|
||||
NOTICE: checking pg_foreign_data_wrapper {fdwhandler} => pg_proc {oid}
|
||||
NOTICE: checking pg_foreign_data_wrapper {fdwvalidator} => pg_proc {oid}
|
||||
NOTICE: checking pg_foreign_data_wrapper {fdwconnection} => pg_proc {oid}
|
||||
NOTICE: checking pg_foreign_server {srvowner} => pg_authid {oid}
|
||||
NOTICE: checking pg_foreign_server {srvfdw} => pg_foreign_data_wrapper {oid}
|
||||
NOTICE: checking pg_user_mapping {umuser} => pg_authid {oid}
|
||||
|
|
@ -269,5 +270,6 @@ NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid}
|
|||
NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid}
|
||||
NOTICE: checking pg_subscription {subdbid} => pg_database {oid}
|
||||
NOTICE: checking pg_subscription {subowner} => pg_authid {oid}
|
||||
NOTICE: checking pg_subscription {subserver} => pg_foreign_server {oid}
|
||||
NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid}
|
||||
NOTICE: checking pg_subscription_rel {srrelid} => pg_class {oid}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,14 @@
|
|||
--
|
||||
-- SUBSCRIPTION
|
||||
--
|
||||
-- directory paths and dlsuffix are passed to us in environment variables
|
||||
\getenv libdir PG_LIBDIR
|
||||
\getenv dlsuffix PG_DLSUFFIX
|
||||
\set regresslib :libdir '/regress' :dlsuffix
|
||||
CREATE FUNCTION test_fdw_connection(oid, oid, internal)
|
||||
RETURNS text
|
||||
AS :'regresslib', 'test_fdw_connection'
|
||||
LANGUAGE C;
|
||||
CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
|
||||
CREATE ROLE regress_subscription_user2;
|
||||
CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription;
|
||||
|
|
@ -116,18 +124,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU
|
|||
WARNING: subscription was created, but is not connected
|
||||
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
|
||||
\dRs+ regress_testsub4
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | none | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
|
||||
\dRs+ regress_testsub4
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub4 | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
DROP SUBSCRIPTION regress_testsub3;
|
||||
|
|
@ -140,15 +148,53 @@ ERROR: invalid connection string syntax: invalid connection option "i_dont_exis
|
|||
-- connecting, so this is reliable and safe)
|
||||
CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
|
||||
ERROR: subscription "regress_testsub5" could not connect to the publisher: invalid port number: "-1"
|
||||
CREATE FOREIGN DATA WRAPPER test_fdw;
|
||||
CREATE SERVER test_server FOREIGN DATA WRAPPER test_fdw;
|
||||
GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
|
||||
SET SESSION AUTHORIZATION regress_subscription_user3;
|
||||
-- fail, need USAGE privileges on server
|
||||
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
|
||||
ERROR: permission denied for foreign server test_server
|
||||
RESET SESSION AUTHORIZATION;
|
||||
GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3;
|
||||
SET SESSION AUTHORIZATION regress_subscription_user3;
|
||||
-- fail, need user mapping
|
||||
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
|
||||
ERROR: user mapping not found for user "regress_subscription_user3", server "test_server"
|
||||
CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret');
|
||||
-- fail, need CONNECTION clause
|
||||
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
|
||||
ERROR: foreign data wrapper "test_fdw" does not support subscription connections
|
||||
DETAIL: Foreign data wrapper must be defined with CONNECTION specified.
|
||||
RESET SESSION AUTHORIZATION;
|
||||
ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection;
|
||||
SET SESSION AUTHORIZATION regress_subscription_user3;
|
||||
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
|
||||
WARNING: subscription was created, but is not connected
|
||||
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
|
||||
DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
|
||||
RESET SESSION AUTHORIZATION;
|
||||
REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3;
|
||||
SET SESSION AUTHORIZATION regress_subscription_user3;
|
||||
-- fail, must connect but lacks USAGE on server, as well as user mapping
|
||||
DROP SUBSCRIPTION regress_testsub6;
|
||||
ERROR: could not connect to publisher when attempting to drop replication slot "dummy": subscription owner "regress_subscription_user3" does not have permission on foreign server "test_server"
|
||||
HINT: Use ALTER SUBSCRIPTION ... DISABLE to disable the subscription, and then use ALTER SUBSCRIPTION ... SET (slot_name = NONE) to disassociate it from the slot.
|
||||
ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
|
||||
DROP SUBSCRIPTION regress_testsub6;
|
||||
SET SESSION AUTHORIZATION regress_subscription_user;
|
||||
REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3;
|
||||
DROP SERVER test_server;
|
||||
DROP FOREIGN DATA WRAPPER test_fdw;
|
||||
-- fail - invalid connection string during ALTER
|
||||
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
|
||||
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
|
||||
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | test subscription
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 | test subscription
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
|
||||
|
|
@ -157,10 +203,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname');
|
|||
ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
|
||||
ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | f | t | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
|
||||
|
|
@ -176,10 +222,10 @@ ERROR: unrecognized subscription parameter: "create_slot"
|
|||
-- ok
|
||||
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00012345 | test subscription
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00012345 | test subscription
|
||||
(1 row)
|
||||
|
||||
-- ok - with lsn = NONE
|
||||
|
|
@ -188,10 +234,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE);
|
|||
ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
|
||||
ERROR: invalid WAL location (LSN): 0/0
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist2 | -1 | 0/00000000 | test subscription
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
|
|
@ -227,10 +273,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = '80s');
|
|||
ALTER SUBSCRIPTION regress_testsub_foo SET (wal_receiver_timeout = 'foobar');
|
||||
ERROR: invalid value for parameter "wal_receiver_timeout": "foobar"
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
|
||||
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | f | 0 | f | local | dbname=regress_doesnotexist2 | 80s | 0/00000000 | test subscription
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+------------------------------+------------------+------------+-------------------
|
||||
regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | parallel | d | f | any | t | f | f | | f | 0 | f | local | dbname=regress_doesnotexist2 | 80s | 0/00000000 | test subscription
|
||||
(1 row)
|
||||
|
||||
-- rename back to keep the rest simple
|
||||
|
|
@ -259,19 +305,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
|
|||
WARNING: subscription was created, but is not connected
|
||||
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | t | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
|
||||
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
DROP SUBSCRIPTION regress_testsub;
|
||||
|
|
@ -283,27 +329,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
|
|||
WARNING: subscription was created, but is not connected
|
||||
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
|
||||
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
-- fail - publication already exists
|
||||
|
|
@ -318,10 +364,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr
|
|||
ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
|
||||
ERROR: publication "testpub1" is already in subscription "regress_testsub"
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
-- fail - publication used more than once
|
||||
|
|
@ -336,10 +382,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub"
|
|||
-- ok - delete publications
|
||||
ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
DROP SUBSCRIPTION regress_testsub;
|
||||
|
|
@ -375,19 +421,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
|
|||
WARNING: subscription was created, but is not connected
|
||||
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
-- we can alter streaming when two_phase enabled
|
||||
ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
|
||||
|
|
@ -397,10 +443,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
|
|||
WARNING: subscription was created, but is not connected
|
||||
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
|
||||
|
|
@ -413,18 +459,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
|
|||
WARNING: subscription was created, but is not connected
|
||||
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | t | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
|
||||
|
|
@ -437,10 +483,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
|
|||
WARNING: subscription was created, but is not connected
|
||||
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
|
||||
|
|
@ -454,19 +500,19 @@ NOTICE: max_retention_duration is ineffective when retain_dead_tuples is disabl
|
|||
WARNING: subscription was created, but is not connected
|
||||
HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and alter the subscription to refresh publications.
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 1000 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 1000 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
-- ok
|
||||
ALTER SUBSCRIPTION regress_testsub SET (max_retention_duration = 0);
|
||||
\dRs+
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
List of subscriptions
|
||||
Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Server | Retain dead tuples | Max retention duration | Retention active | Synchronous commit | Conninfo | Receiver timeout | Skip LSN | Description
|
||||
-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------+--------------------+------------------------+------------------+--------------------+-----------------------------+------------------+------------+-------------
|
||||
regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | | f | 0 | f | off | dbname=regress_doesnotexist | -1 | 0/00000000 |
|
||||
(1 row)
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
|
||||
|
|
|
|||
|
|
@ -729,6 +729,13 @@ test_fdw_handler(PG_FUNCTION_ARGS)
|
|||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(test_fdw_connection);
|
||||
Datum
|
||||
test_fdw_connection(PG_FUNCTION_ARGS)
|
||||
{
|
||||
PG_RETURN_TEXT_P(cstring_to_text("dbname=regress_doesnotexist user=doesnotexist password=secret"));
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(is_catalog_text_unique_index_oid);
|
||||
Datum
|
||||
is_catalog_text_unique_index_oid(PG_FUNCTION_ARGS)
|
||||
|
|
|
|||
|
|
@ -2,6 +2,17 @@
|
|||
-- SUBSCRIPTION
|
||||
--
|
||||
|
||||
-- directory paths and dlsuffix are passed to us in environment variables
|
||||
\getenv libdir PG_LIBDIR
|
||||
\getenv dlsuffix PG_DLSUFFIX
|
||||
|
||||
\set regresslib :libdir '/regress' :dlsuffix
|
||||
|
||||
CREATE FUNCTION test_fdw_connection(oid, oid, internal)
|
||||
RETURNS text
|
||||
AS :'regresslib', 'test_fdw_connection'
|
||||
LANGUAGE C;
|
||||
|
||||
CREATE ROLE regress_subscription_user LOGIN SUPERUSER;
|
||||
CREATE ROLE regress_subscription_user2;
|
||||
CREATE ROLE regress_subscription_user3 IN ROLE pg_create_subscription;
|
||||
|
|
@ -85,6 +96,50 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'i_dont_exist=param' PUBLICATION
|
|||
-- connecting, so this is reliable and safe)
|
||||
CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
|
||||
|
||||
CREATE FOREIGN DATA WRAPPER test_fdw;
|
||||
CREATE SERVER test_server FOREIGN DATA WRAPPER test_fdw;
|
||||
|
||||
GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
|
||||
SET SESSION AUTHORIZATION regress_subscription_user3;
|
||||
|
||||
-- fail, need USAGE privileges on server
|
||||
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
|
||||
|
||||
RESET SESSION AUTHORIZATION;
|
||||
GRANT USAGE ON FOREIGN SERVER test_server TO regress_subscription_user3;
|
||||
SET SESSION AUTHORIZATION regress_subscription_user3;
|
||||
|
||||
-- fail, need user mapping
|
||||
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
|
||||
|
||||
CREATE USER MAPPING FOR regress_subscription_user3 SERVER test_server OPTIONS(user 'foo', password 'secret');
|
||||
|
||||
-- fail, need CONNECTION clause
|
||||
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = NONE, connect = false);
|
||||
|
||||
RESET SESSION AUTHORIZATION;
|
||||
ALTER FOREIGN DATA WRAPPER test_fdw CONNECTION test_fdw_connection;
|
||||
SET SESSION AUTHORIZATION regress_subscription_user3;
|
||||
|
||||
CREATE SUBSCRIPTION regress_testsub6 SERVER test_server PUBLICATION testpub WITH (slot_name = 'dummy', connect = false);
|
||||
|
||||
DROP USER MAPPING FOR regress_subscription_user3 SERVER test_server;
|
||||
RESET SESSION AUTHORIZATION;
|
||||
REVOKE USAGE ON FOREIGN SERVER test_server FROM regress_subscription_user3;
|
||||
SET SESSION AUTHORIZATION regress_subscription_user3;
|
||||
|
||||
-- fail, must connect but lacks USAGE on server, as well as user mapping
|
||||
DROP SUBSCRIPTION regress_testsub6;
|
||||
|
||||
ALTER SUBSCRIPTION regress_testsub6 SET (slot_name = NONE);
|
||||
DROP SUBSCRIPTION regress_testsub6;
|
||||
|
||||
SET SESSION AUTHORIZATION regress_subscription_user;
|
||||
REVOKE CREATE ON DATABASE REGRESSION FROM regress_subscription_user3;
|
||||
|
||||
DROP SERVER test_server;
|
||||
DROP FOREIGN DATA WRAPPER test_fdw;
|
||||
|
||||
-- fail - invalid connection string during ALTER
|
||||
ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue