GetSubscription(): use per-object memory context.

Constructing a Subcription object uses a number of small or temporary
allocations. Use a per-object memory context for easy cleanup.

Get rid of FreeSubscription() which did not free all the allocations
anyway. Also get rid of the PG_TRY()/PG_CATCH() logic in
ForeignServerConnectionString() which were used to avoid leaks during
GetSubscription().

Co-authored-by: Álvaro Herrera <alvherre@kurilemu.de>
Suggested-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/xvdjrdqnpap3uq7owbaox3r7p5gf7sv62aaqf2ju3vb6yglatr%40kvvwhoudrlxq
Discussion: https://postgr.es/m/CAA4eK1K=WjZ1maBCmj=5ZdO66AwPORK5ZBxVKedS0xdCcb621A@mail.gmail.com
This commit is contained in:
Jeff Davis 2026-03-24 15:10:03 -07:00
parent a881cc9c7e
commit f16f5d608c
4 changed files with 48 additions and 81 deletions

View file

@ -32,6 +32,7 @@
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/syscache.h"
@ -80,6 +81,8 @@ GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
Form_pg_subscription subform;
Datum datum;
bool isnull;
MemoryContext cxt;
MemoryContext oldcxt;
tup = SearchSysCache1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));
@ -91,9 +94,14 @@ GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
elog(ERROR, "cache lookup failed for subscription %u", subid);
}
cxt = AllocSetContextCreate(CurrentMemoryContext, "subscription",
ALLOCSET_SMALL_SIZES);
oldcxt = MemoryContextSwitchTo(cxt);
subform = (Form_pg_subscription) GETSTRUCT(tup);
sub = palloc_object(Subscription);
sub->cxt = cxt;
sub->oid = subid;
sub->dbid = subform->subdbid;
sub->skiplsn = subform->subskiplsn;
@ -181,6 +189,8 @@ GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
ReleaseSysCache(tup);
MemoryContextSwitchTo(oldcxt);
return sub;
}
@ -217,20 +227,6 @@ CountDBSubscriptions(Oid dbid)
return nsubs;
}
/*
* Free memory allocated by subscription struct.
*/
void
FreeSubscription(Subscription *sub)
{
pfree(sub->name);
pfree(sub->conninfo);
if (sub->slotname)
pfree(sub->slotname);
list_free_deep(sub->publications);
pfree(sub);
}
/*
* Disable the given subscription.
*/

View file

@ -220,62 +220,32 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
/*
* Retrieve connection string from server's FDW.
*
* NB: leaks into CurrentMemoryContext.
*/
char *
ForeignServerConnectionString(Oid userid, Oid serverid)
{
MemoryContext tempContext;
MemoryContext oldcxt;
text *volatile connection_text = NULL;
char *result = NULL;
ForeignServer *server;
ForeignDataWrapper *fdw;
Datum connection_datum;
/*
* GetForeignServer, GetForeignDataWrapper, and the connection function
* itself all leak memory into CurrentMemoryContext. Switch to a temporary
* context for easy cleanup.
*/
tempContext = AllocSetContextCreate(CurrentMemoryContext,
"FDWConnectionContext",
ALLOCSET_SMALL_SIZES);
server = GetForeignServer(serverid);
fdw = GetForeignDataWrapper(server->fdwid);
oldcxt = MemoryContextSwitchTo(tempContext);
if (!OidIsValid(fdw->fdwconnection))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("foreign data wrapper \"%s\" does not support subscription connections",
fdw->fdwname),
errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
PG_TRY();
{
ForeignServer *server;
ForeignDataWrapper *fdw;
Datum connection_datum;
connection_datum = OidFunctionCall3(fdw->fdwconnection,
ObjectIdGetDatum(userid),
ObjectIdGetDatum(serverid),
PointerGetDatum(NULL));
server = GetForeignServer(serverid);
fdw = GetForeignDataWrapper(server->fdwid);
if (!OidIsValid(fdw->fdwconnection))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("foreign data wrapper \"%s\" does not support subscription connections",
fdw->fdwname),
errdetail("Foreign data wrapper must be defined with CONNECTION specified.")));
connection_datum = OidFunctionCall3(fdw->fdwconnection,
ObjectIdGetDatum(userid),
ObjectIdGetDatum(serverid),
PointerGetDatum(NULL));
connection_text = DatumGetTextPP(connection_datum);
}
PG_FINALLY();
{
MemoryContextSwitchTo(oldcxt);
if (connection_text)
result = text_to_cstring((text *) connection_text);
MemoryContextDelete(tempContext);
}
PG_END_TRY();
return result;
return text_to_cstring(DatumGetTextPP(connection_datum));
}

View file

@ -5043,7 +5043,6 @@ apply_worker_exit(void)
void
maybe_reread_subscription(void)
{
MemoryContext oldctx;
Subscription *newsub;
bool started_tx = false;
@ -5058,17 +5057,18 @@ maybe_reread_subscription(void)
started_tx = true;
}
/* Ensure allocations in permanent context. */
oldctx = MemoryContextSwitchTo(ApplyContext);
newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
/*
* Exit if the subscription was removed. This normally should not happen
* as the worker gets killed during DROP SUBSCRIPTION.
*/
if (!newsub)
if (newsub)
{
MemoryContextSetParent(newsub->cxt, ApplyContext);
}
else
{
/*
* Exit if the subscription was removed. This normally should not
* happen as the worker gets killed during DROP SUBSCRIPTION.
*/
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will stop because the subscription was removed",
MySubscription->name)));
@ -5151,11 +5151,9 @@ maybe_reread_subscription(void)
}
/* Clean old subscription info and switch to new one. */
FreeSubscription(MySubscription);
MemoryContextDelete(MySubscription->cxt);
MySubscription = newsub;
MemoryContextSwitchTo(oldctx);
/* Change synchronous commit according to the user's wishes */
SetConfigOption("synchronous_commit", MySubscription->synccommit,
PGC_BACKEND, PGC_S_OVERRIDE);
@ -5779,8 +5777,6 @@ run_apply_worker(void)
void
InitializeLogRepWorker(void)
{
MemoryContext oldctx;
/* Run as replica session replication role. */
SetConfigOption("session_replication_role", "replica",
PGC_SUSET, PGC_S_OVERRIDE);
@ -5796,12 +5792,11 @@ InitializeLogRepWorker(void)
*/
SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
/* Load the subscription into persistent memory context. */
ApplyContext = AllocSetContextCreate(TopMemoryContext,
"ApplyContext",
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
oldctx = MemoryContextSwitchTo(ApplyContext);
/*
* Lock the subscription to prevent it from being concurrently dropped,
@ -5810,8 +5805,14 @@ InitializeLogRepWorker(void)
*/
LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, 0,
AccessShareLock);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
if (!MySubscription)
if (MySubscription)
{
MemoryContextSetParent(MySubscription->cxt, ApplyContext);
}
else
{
ereport(LOG,
(errmsg("logical replication worker for subscription %u will not start because the subscription was removed during startup",
@ -5825,7 +5826,6 @@ InitializeLogRepWorker(void)
}
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
if (!MySubscription->enabled)
{

View file

@ -130,6 +130,8 @@ MAKE_SYSCACHE(SUBSCRIPTIONNAME, pg_subscription_subname_index, 4);
typedef struct Subscription
{
MemoryContext cxt; /* mem cxt containing this subscription */
Oid oid; /* Oid of the subscription */
Oid dbid; /* Oid of the database which subscription is
* in */
@ -212,7 +214,6 @@ typedef struct Subscription
extern Subscription *GetSubscription(Oid subid, bool missing_ok,
bool aclcheck);
extern void FreeSubscription(Subscription *sub);
extern void DisableSubscription(Oid subid);
extern int CountDBSubscriptions(Oid dbid);