postgresql/src/backend/replication/logical/relation.c
Amit Kapila ac0e2d387a Fix memory leak due to LogicalRepRelMapEntry.attrmap.
When rebuilding the relation mapping on subscribers, we were not releasing
the attribute mapping's memory which was no longer required.

The attribute mapping used in logical tuple conversion was refactored in
PG13 (by commit e1551f96e6) but we forgot to update the related code that
frees the attribute map.

Author: Hou Zhijie
Reviewed-by: Amit Langote, Amit Kapila, Shi yu
Backpatch-through: 10, where it was introduced
Discussion: https://postgr.es/m/OSZPR01MB6310F46CD425A967E4AEF736FDA49@OSZPR01MB6310.jpnprd01.prod.outlook.com
2022-06-23 09:23:46 +05:30

705 lines
19 KiB
C

/*-------------------------------------------------------------------------
* relation.c
* PostgreSQL logical replication relation mapping cache
*
* Copyright (c) 2016-2022, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/logical/relation.c
*
* NOTES
* Routines in this file mainly have to do with mapping the properties
* of local replication target relations to the properties of their
* remote counterpart.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/table.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription_rel.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "replication/logicalrelation.h"
#include "replication/worker_internal.h"
#include "utils/inval.h"
static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
/*
* Partition map (LogicalRepPartMap)
*
* When a partitioned table is used as replication target, replicated
* operations are actually performed on its leaf partitions, which requires
* the partitions to also be mapped to the remote relation. Parent's entry
* (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
* individual partitions may have different attribute numbers, which means
* attribute mappings to remote relation's attributes must be maintained
* separately for each partition.
*/
static MemoryContext LogicalRepPartMapContext = NULL;
static HTAB *LogicalRepPartMap = NULL;
typedef struct LogicalRepPartMapEntry
{
Oid partoid; /* LogicalRepPartMap's key */
LogicalRepRelMapEntry relmapentry;
} LogicalRepPartMapEntry;
/*
* Relcache invalidation callback for our relation map cache.
*/
static void
logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
{
LogicalRepRelMapEntry *entry;
/* Just to be sure. */
if (LogicalRepRelMap == NULL)
return;
if (reloid != InvalidOid)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepRelMap);
/* TODO, use inverse lookup hashtable? */
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
{
if (entry->localreloid == reloid)
{
entry->localrelvalid = false;
hash_seq_term(&status);
break;
}
}
}
else
{
/* invalidate all cache entries */
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepRelMap);
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
entry->localrelvalid = false;
}
}
/*
* Initialize the relation map cache.
*/
static void
logicalrep_relmap_init(void)
{
HASHCTL ctl;
if (!LogicalRepRelMapContext)
LogicalRepRelMapContext =
AllocSetContextCreate(CacheMemoryContext,
"LogicalRepRelMapContext",
ALLOCSET_DEFAULT_SIZES);
/* Initialize the relation hash table. */
ctl.keysize = sizeof(LogicalRepRelId);
ctl.entrysize = sizeof(LogicalRepRelMapEntry);
ctl.hcxt = LogicalRepRelMapContext;
LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
(Datum) 0);
}
/*
* Free the entry of a relation map cache.
*/
static void
logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
{
LogicalRepRelation *remoterel;
remoterel = &entry->remoterel;
pfree(remoterel->nspname);
pfree(remoterel->relname);
if (remoterel->natts > 0)
{
int i;
for (i = 0; i < remoterel->natts; i++)
pfree(remoterel->attnames[i]);
pfree(remoterel->attnames);
pfree(remoterel->atttyps);
}
bms_free(remoterel->attkeys);
if (entry->attrmap)
free_attrmap(entry->attrmap);
}
/*
* Add new entry or update existing entry in the relation map cache.
*
* Called when new relation mapping is sent by the publisher to update
* our expected view of incoming data from said publisher.
*/
void
logicalrep_relmap_update(LogicalRepRelation *remoterel)
{
MemoryContext oldctx;
LogicalRepRelMapEntry *entry;
bool found;
int i;
if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();
/*
* HASH_ENTER returns the existing entry if present or creates a new one.
*/
entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
HASH_ENTER, &found);
if (found)
logicalrep_relmap_free_entry(entry);
memset(entry, 0, sizeof(LogicalRepRelMapEntry));
/* Make cached copy of the data */
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->remoterel.remoteid = remoterel->remoteid;
entry->remoterel.nspname = pstrdup(remoterel->nspname);
entry->remoterel.relname = pstrdup(remoterel->relname);
entry->remoterel.natts = remoterel->natts;
entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
for (i = 0; i < remoterel->natts; i++)
{
entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
}
entry->remoterel.replident = remoterel->replident;
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
MemoryContextSwitchTo(oldctx);
}
/*
* Find attribute index in TupleDesc struct by attribute name.
*
* Returns -1 if not found.
*/
static int
logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
{
int i;
for (i = 0; i < remoterel->natts; i++)
{
if (strcmp(remoterel->attnames[i], attname) == 0)
return i;
}
return -1;
}
/*
* Report error with names of the missing local relation column(s), if any.
*/
static void
logicalrep_report_missing_attrs(LogicalRepRelation *remoterel,
Bitmapset *missingatts)
{
if (!bms_is_empty(missingatts))
{
StringInfoData missingattsbuf;
int missingattcnt = 0;
int i;
initStringInfo(&missingattsbuf);
while ((i = bms_first_member(missingatts)) >= 0)
{
missingattcnt++;
if (missingattcnt == 1)
appendStringInfo(&missingattsbuf, _("\"%s\""),
remoterel->attnames[i]);
else
appendStringInfo(&missingattsbuf, _(", \"%s\""),
remoterel->attnames[i]);
}
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg_plural("logical replication target relation \"%s.%s\" is missing replicated column: %s",
"logical replication target relation \"%s.%s\" is missing replicated columns: %s",
missingattcnt,
remoterel->nspname,
remoterel->relname,
missingattsbuf.data)));
}
}
/*
* Check if replica identity matches and mark the updatable flag.
*
* We allow for stricter replica identity (fewer columns) on subscriber as
* that will not stop us from finding unique tuple. IE, if publisher has
* identity (id,timestamp) and subscriber just (id) this will not be a
* problem, but in the opposite scenario it will.
*
* We just mark the relation entry as not updatable here if the local
* replica identity is found to be insufficient for applying
* updates/deletes (inserts don't care!) and leave it to
* check_relation_updatable() to throw the actual error if needed.
*/
static void
logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
{
Bitmapset *idkey;
LogicalRepRelation *remoterel = &entry->remoterel;
int i;
entry->updatable = true;
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
/* fallback to PK if no replica identity */
if (idkey == NULL)
{
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_PRIMARY_KEY);
/*
* If no replica identity index and no PK, the published table must
* have replica identity FULL.
*/
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
entry->updatable = false;
}
i = -1;
while ((i = bms_next_member(idkey, i)) >= 0)
{
int attnum = i + FirstLowInvalidHeapAttributeNumber;
if (!AttrNumberIsForUserDefinedAttr(attnum))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" uses "
"system columns in REPLICA IDENTITY index",
remoterel->nspname, remoterel->relname)));
attnum = AttrNumberGetAttrOffset(attnum);
if (entry->attrmap->attnums[attnum] < 0 ||
!bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys))
{
entry->updatable = false;
break;
}
}
}
/*
* Open the local relation associated with the remote one.
*
* Rebuilds the Relcache mapping if it was invalidated by local DDL.
*/
LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
{
LogicalRepRelMapEntry *entry;
bool found;
LogicalRepRelation *remoterel;
if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();
/* Search for existing entry. */
entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
HASH_FIND, &found);
if (!found)
elog(ERROR, "no relation map entry for remote relation ID %u",
remoteid);
remoterel = &entry->remoterel;
/* Ensure we don't leak a relcache refcount. */
if (entry->localrel)
elog(ERROR, "remote relation ID %u is already open", remoteid);
/*
* When opening and locking a relation, pending invalidation messages are
* processed which can invalidate the relation. Hence, if the entry is
* currently considered valid, try to open the local relation by OID and
* see if invalidation ensues.
*/
if (entry->localrelvalid)
{
entry->localrel = try_table_open(entry->localreloid, lockmode);
if (!entry->localrel)
{
/* Table was renamed or dropped. */
entry->localrelvalid = false;
}
else if (!entry->localrelvalid)
{
/* Note we release the no-longer-useful lock here. */
table_close(entry->localrel, lockmode);
entry->localrel = NULL;
}
}
/*
* If the entry has been marked invalid since we last had lock on it,
* re-open the local relation by name and rebuild all derived data.
*/
if (!entry->localrelvalid)
{
Oid relid;
TupleDesc desc;
MemoryContext oldctx;
int i;
Bitmapset *missingatts;
/* Release the no-longer-useful attrmap, if any. */
if (entry->attrmap)
{
free_attrmap(entry->attrmap);
entry->attrmap = NULL;
}
/* Try to find and lock the relation by name. */
relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
remoterel->relname, -1),
lockmode, true);
if (!OidIsValid(relid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" does not exist",
remoterel->nspname, remoterel->relname)));
entry->localrel = table_open(relid, NoLock);
entry->localreloid = relid;
/* Check for supported relkind. */
CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
remoterel->nspname, remoterel->relname);
/*
* Build the mapping of local attribute numbers to remote attribute
* numbers and validate that we don't miss any replicated columns as
* that would result in potentially unwanted data loss.
*/
desc = RelationGetDescr(entry->localrel);
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->attrmap = make_attrmap(desc->natts);
MemoryContextSwitchTo(oldctx);
/* check and report missing attrs, if any */
missingatts = bms_add_range(NULL, 0, remoterel->natts - 1);
for (i = 0; i < desc->natts; i++)
{
int attnum;
Form_pg_attribute attr = TupleDescAttr(desc, i);
if (attr->attisdropped || attr->attgenerated)
{
entry->attrmap->attnums[i] = -1;
continue;
}
attnum = logicalrep_rel_att_by_name(remoterel,
NameStr(attr->attname));
entry->attrmap->attnums[i] = attnum;
if (attnum >= 0)
missingatts = bms_del_member(missingatts, attnum);
}
logicalrep_report_missing_attrs(remoterel, missingatts);
/* be tidy */
bms_free(missingatts);
/*
* Set if the table's replica identity is enough to apply
* update/delete.
*/
logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;
}
if (entry->state != SUBREL_STATE_READY)
entry->state = GetSubscriptionRelState(MySubscription->oid,
entry->localreloid,
&entry->statelsn);
return entry;
}
/*
* Close the previously opened logical relation.
*/
void
logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
{
table_close(rel->localrel, lockmode);
rel->localrel = NULL;
}
/*
* Partition cache: look up partition LogicalRepRelMapEntry's
*
* Unlike relation map cache, this is keyed by partition OID, not remote
* relation OID, because we only have to use this cache in the case where
* partitions are not directly mapped to any remote relation, such as when
* replication is occurring with one of their ancestors as target.
*/
/*
* Relcache invalidation callback
*/
static void
logicalrep_partmap_invalidate_cb(Datum arg, Oid reloid)
{
LogicalRepPartMapEntry *entry;
/* Just to be sure. */
if (LogicalRepPartMap == NULL)
return;
if (reloid != InvalidOid)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepPartMap);
/* TODO, use inverse lookup hashtable? */
while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
{
if (entry->relmapentry.localreloid == reloid)
{
entry->relmapentry.localrelvalid = false;
hash_seq_term(&status);
break;
}
}
}
else
{
/* invalidate all cache entries */
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepPartMap);
while ((entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
entry->relmapentry.localrelvalid = false;
}
}
/*
* Reset the entries in the partition map that refer to remoterel.
*
* Called when new relation mapping is sent by the publisher to update our
* expected view of incoming data from said publisher.
*
* Note that we don't update the remoterel information in the entry here,
* we will update the information in logicalrep_partition_open to avoid
* unnecessary work.
*/
void
logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel)
{
HASH_SEQ_STATUS status;
LogicalRepPartMapEntry *part_entry;
LogicalRepRelMapEntry *entry;
if (LogicalRepPartMap == NULL)
return;
hash_seq_init(&status, LogicalRepPartMap);
while ((part_entry = (LogicalRepPartMapEntry *) hash_seq_search(&status)) != NULL)
{
entry = &part_entry->relmapentry;
if (entry->remoterel.remoteid != remoterel->remoteid)
continue;
logicalrep_relmap_free_entry(entry);
memset(entry, 0, sizeof(LogicalRepRelMapEntry));
}
}
/*
* Initialize the partition map cache.
*/
static void
logicalrep_partmap_init(void)
{
HASHCTL ctl;
if (!LogicalRepPartMapContext)
LogicalRepPartMapContext =
AllocSetContextCreate(CacheMemoryContext,
"LogicalRepPartMapContext",
ALLOCSET_DEFAULT_SIZES);
/* Initialize the relation hash table. */
ctl.keysize = sizeof(Oid); /* partition OID */
ctl.entrysize = sizeof(LogicalRepPartMapEntry);
ctl.hcxt = LogicalRepPartMapContext;
LogicalRepPartMap = hash_create("logicalrep partition map cache", 64, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_partmap_invalidate_cb,
(Datum) 0);
}
/*
* logicalrep_partition_open
*
* Returned entry reuses most of the values of the root table's entry, save
* the attribute map, which can be different for the partition. However,
* we must physically copy all the data, in case the root table's entry
* gets freed/rebuilt.
*
* Note there's no logicalrep_partition_close, because the caller closes the
* component relation.
*/
LogicalRepRelMapEntry *
logicalrep_partition_open(LogicalRepRelMapEntry *root,
Relation partrel, AttrMap *map)
{
LogicalRepRelMapEntry *entry;
LogicalRepPartMapEntry *part_entry;
LogicalRepRelation *remoterel = &root->remoterel;
Oid partOid = RelationGetRelid(partrel);
AttrMap *attrmap = root->attrmap;
bool found;
MemoryContext oldctx;
if (LogicalRepPartMap == NULL)
logicalrep_partmap_init();
/* Search for existing entry. */
part_entry = (LogicalRepPartMapEntry *) hash_search(LogicalRepPartMap,
(void *) &partOid,
HASH_ENTER, &found);
entry = &part_entry->relmapentry;
/*
* We must always overwrite entry->localrel with the latest partition
* Relation pointer, because the Relation pointed to by the old value may
* have been cleared after the caller would have closed the partition
* relation after the last use of this entry. Note that localrelvalid is
* only updated by the relcache invalidation callback, so it may still be
* true irrespective of whether the Relation pointed to by localrel has
* been cleared or not.
*/
if (found && entry->localrelvalid)
{
entry->localrel = partrel;
return entry;
}
/* Switch to longer-lived context. */
oldctx = MemoryContextSwitchTo(LogicalRepPartMapContext);
if (!found)
{
memset(part_entry, 0, sizeof(LogicalRepPartMapEntry));
part_entry->partoid = partOid;
}
/* Release the no-longer-useful attrmap, if any. */
if (entry->attrmap)
{
free_attrmap(entry->attrmap);
entry->attrmap = NULL;
}
if (!entry->remoterel.remoteid)
{
int i;
/* Remote relation is copied as-is from the root entry. */
entry = &part_entry->relmapentry;
entry->remoterel.remoteid = remoterel->remoteid;
entry->remoterel.nspname = pstrdup(remoterel->nspname);
entry->remoterel.relname = pstrdup(remoterel->relname);
entry->remoterel.natts = remoterel->natts;
entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
for (i = 0; i < remoterel->natts; i++)
{
entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
}
entry->remoterel.replident = remoterel->replident;
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
}
entry->localrel = partrel;
entry->localreloid = partOid;
/*
* If the partition's attributes don't match the root relation's, we'll
* need to make a new attrmap which maps partition attribute numbers to
* remoterel's, instead of the original which maps root relation's
* attribute numbers to remoterel's.
*
* Note that 'map' which comes from the tuple routing data structure
* contains 1-based attribute numbers (of the parent relation). However,
* the map in 'entry', a logical replication data structure, contains
* 0-based attribute numbers (of the remote relation).
*/
if (map)
{
AttrNumber attno;
entry->attrmap = make_attrmap(map->maplen);
for (attno = 0; attno < entry->attrmap->maplen; attno++)
{
AttrNumber root_attno = map->attnums[attno];
/* 0 means it's a dropped attribute. See comments atop AttrMap. */
if (root_attno == 0)
entry->attrmap->attnums[attno] = -1;
else
entry->attrmap->attnums[attno] = attrmap->attnums[root_attno - 1];
}
}
else
{
/* Lacking copy_attmap, do this the hard way. */
entry->attrmap = make_attrmap(attrmap->maplen);
memcpy(entry->attrmap->attnums, attrmap->attnums,
attrmap->maplen * sizeof(AttrNumber));
}
/* Set if the table's replica identity is enough to apply update/delete. */
logicalrep_rel_mark_updatable(entry);
entry->localrelvalid = true;
/* state and statelsn are left set to 0. */
MemoryContextSwitchTo(oldctx);
return entry;
}