postgresql/src/backend/replication/logical/relation.c
Tom Lane d6d70f89a6 Fix bogus cache-invalidation logic in logical replication worker.
The code recorded cache invalidation events by zeroing the "localreloid"
field of affected cache entries.  However, it's possible for an inval
event to occur even while we have the entry open and locked.  So an
ill-timed inval could result in "cache lookup failed for relation 0"
errors, if the worker's code tried to use the cleared field.  We can
fix that by creating a separate bool field to record whether the entry
needs to be revalidated.  (In the back branches, cram the bool into
what had been padding space, to avoid an ABI break in the somewhat
unlikely event that any extension is looking at this struct.)

Also, rearrange the logic in logicalrep_rel_open so that it
does the right thing in cases where table_open would fail.
We should retry the lookup by name in that case, but we didn't.

The real-world impact of this is probably small.  In the first place,
the error conditions are very low probability, and in the second place,
the worker would just exit and get restarted.  We only noticed because
in a CLOBBER_CACHE_ALWAYS build, the failure can occur repeatedly,
preventing the worker from making progress.  Nonetheless, it's clearly
a bug, and it impedes a useful type of testing; so back-patch to v10
where this code was introduced.

Discussion: https://postgr.es/m/1032727.1600096803@sss.pgh.pa.us
2020-09-16 12:07:31 -04:00

488 lines
13 KiB
C

/*-------------------------------------------------------------------------
* relation.c
* PostgreSQL logical replication
*
* Copyright (c) 2016-2017, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/logical/relation.c
*
* NOTES
* This file contains helper functions for logical replication relation
* mapping cache.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/sysattr.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription_rel.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "replication/logicalrelation.h"
#include "replication/worker_internal.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
static MemoryContext LogicalRepRelMapContext = NULL;
static HTAB *LogicalRepRelMap = NULL;
static HTAB *LogicalRepTypMap = NULL;
/*
* Relcache invalidation callback for our relation map cache.
*/
static void
logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid)
{
LogicalRepRelMapEntry *entry;
/* Just to be sure. */
if (LogicalRepRelMap == NULL)
return;
if (reloid != InvalidOid)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepRelMap);
/* TODO, use inverse lookup hashtable? */
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
{
if (entry->localreloid == reloid)
{
entry->localrelvalid = false;
hash_seq_term(&status);
break;
}
}
}
else
{
/* invalidate all cache entries */
HASH_SEQ_STATUS status;
hash_seq_init(&status, LogicalRepRelMap);
while ((entry = (LogicalRepRelMapEntry *) hash_seq_search(&status)) != NULL)
entry->localrelvalid = false;
}
}
/*
* Initialize the relation map cache.
*/
static void
logicalrep_relmap_init(void)
{
HASHCTL ctl;
if (!LogicalRepRelMapContext)
LogicalRepRelMapContext =
AllocSetContextCreate(CacheMemoryContext,
"LogicalRepRelMapContext",
ALLOCSET_DEFAULT_SIZES);
/* Initialize the relation hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(LogicalRepRelId);
ctl.entrysize = sizeof(LogicalRepRelMapEntry);
ctl.hcxt = LogicalRepRelMapContext;
LogicalRepRelMap = hash_create("logicalrep relation map cache", 128, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Initialize the type hash table. */
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(Oid);
ctl.entrysize = sizeof(LogicalRepTyp);
ctl.hcxt = LogicalRepRelMapContext;
/* This will usually be small. */
LogicalRepTypMap = hash_create("logicalrep type map cache", 2, &ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
/* Watch for invalidation events. */
CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb,
(Datum) 0);
}
/*
* Free the entry of a relation map cache.
*/
static void
logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry)
{
LogicalRepRelation *remoterel;
remoterel = &entry->remoterel;
pfree(remoterel->nspname);
pfree(remoterel->relname);
if (remoterel->natts > 0)
{
int i;
for (i = 0; i < remoterel->natts; i++)
pfree(remoterel->attnames[i]);
pfree(remoterel->attnames);
pfree(remoterel->atttyps);
}
bms_free(remoterel->attkeys);
if (entry->attrmap)
pfree(entry->attrmap);
}
/*
* Add new entry or update existing entry in the relation map cache.
*
* Called when new relation mapping is sent by the publisher to update
* our expected view of incoming data from said publisher.
*/
void
logicalrep_relmap_update(LogicalRepRelation *remoterel)
{
MemoryContext oldctx;
LogicalRepRelMapEntry *entry;
bool found;
int i;
if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();
/*
* HASH_ENTER returns the existing entry if present or creates a new one.
*/
entry = hash_search(LogicalRepRelMap, (void *) &remoterel->remoteid,
HASH_ENTER, &found);
if (found)
logicalrep_relmap_free_entry(entry);
memset(entry, 0, sizeof(LogicalRepRelMapEntry));
/* Make cached copy of the data */
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->remoterel.remoteid = remoterel->remoteid;
entry->remoterel.nspname = pstrdup(remoterel->nspname);
entry->remoterel.relname = pstrdup(remoterel->relname);
entry->remoterel.natts = remoterel->natts;
entry->remoterel.attnames = palloc(remoterel->natts * sizeof(char *));
entry->remoterel.atttyps = palloc(remoterel->natts * sizeof(Oid));
for (i = 0; i < remoterel->natts; i++)
{
entry->remoterel.attnames[i] = pstrdup(remoterel->attnames[i]);
entry->remoterel.atttyps[i] = remoterel->atttyps[i];
}
entry->remoterel.replident = remoterel->replident;
entry->remoterel.attkeys = bms_copy(remoterel->attkeys);
MemoryContextSwitchTo(oldctx);
}
/*
* Find attribute index in TupleDesc struct by attribute name.
*
* Returns -1 if not found.
*/
static int
logicalrep_rel_att_by_name(LogicalRepRelation *remoterel, const char *attname)
{
int i;
for (i = 0; i < remoterel->natts; i++)
{
if (strcmp(remoterel->attnames[i], attname) == 0)
return i;
}
return -1;
}
/*
* Open the local relation associated with the remote one.
*
* Rebuilds the Relcache mapping if it was invalidated by local DDL.
*/
LogicalRepRelMapEntry *
logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
{
LogicalRepRelMapEntry *entry;
bool found;
LogicalRepRelation *remoterel;
if (LogicalRepRelMap == NULL)
logicalrep_relmap_init();
/* Search for existing entry. */
entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
HASH_FIND, &found);
if (!found)
elog(ERROR, "no relation map entry for remote relation ID %u",
remoteid);
remoterel = &entry->remoterel;
/* Ensure we don't leak a relcache refcount. */
if (entry->localrel)
elog(ERROR, "remote relation ID %u is already open", remoteid);
/*
* When opening and locking a relation, pending invalidation messages are
* processed which can invalidate the relation. Hence, if the entry is
* currently considered valid, try to open the local relation by OID and
* see if invalidation ensues.
*/
if (entry->localrelvalid)
{
entry->localrel = try_relation_open(entry->localreloid, lockmode);
if (!entry->localrel)
{
/* Table was renamed or dropped. */
entry->localrelvalid = false;
}
else if (!entry->localrelvalid)
{
/* Note we release the no-longer-useful lock here. */
heap_close(entry->localrel, lockmode);
entry->localrel = NULL;
}
}
/*
* If the entry has been marked invalid since we last had lock on it,
* re-open the local relation by name and rebuild all derived data.
*/
if (!entry->localrelvalid)
{
Oid relid;
int found;
Bitmapset *idkey;
TupleDesc desc;
MemoryContext oldctx;
int i;
/* Try to find and lock the relation by name. */
relid = RangeVarGetRelid(makeRangeVar(remoterel->nspname,
remoterel->relname, -1),
lockmode, true);
if (!OidIsValid(relid))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" does not exist",
remoterel->nspname, remoterel->relname)));
entry->localrel = heap_open(relid, NoLock);
entry->localreloid = relid;
/* Check for supported relkind. */
CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
remoterel->nspname, remoterel->relname);
/*
* Build the mapping of local attribute numbers to remote attribute
* numbers and validate that we don't miss any replicated columns as
* that would result in potentially unwanted data loss.
*/
desc = RelationGetDescr(entry->localrel);
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->attrmap = palloc(desc->natts * sizeof(AttrNumber));
MemoryContextSwitchTo(oldctx);
found = 0;
for (i = 0; i < desc->natts; i++)
{
int attnum;
if (desc->attrs[i]->attisdropped)
{
entry->attrmap[i] = -1;
continue;
}
attnum = logicalrep_rel_att_by_name(remoterel,
NameStr(desc->attrs[i]->attname));
entry->attrmap[i] = attnum;
if (attnum >= 0)
found++;
}
/* TODO, detail message with names of missing columns */
if (found < remoterel->natts)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" is missing "
"some replicated columns",
remoterel->nspname, remoterel->relname)));
/*
* Check that replica identity matches. We allow for stricter replica
* identity (fewer columns) on subscriber as that will not stop us
* from finding unique tuple. IE, if publisher has identity
* (id,timestamp) and subscriber just (id) this will not be a problem,
* but in the opposite scenario it will.
*
* Don't throw any error here just mark the relation entry as not
* updatable, as replica identity is only for updates and deletes but
* inserts can be replicated even without it.
*/
entry->updatable = true;
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_IDENTITY_KEY);
/* fallback to PK if no replica identity */
if (idkey == NULL)
{
idkey = RelationGetIndexAttrBitmap(entry->localrel,
INDEX_ATTR_BITMAP_PRIMARY_KEY);
/*
* If no replica identity index and no PK, the published table
* must have replica identity FULL.
*/
if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL)
entry->updatable = false;
}
i = -1;
while ((i = bms_next_member(idkey, i)) >= 0)
{
int attnum = i + FirstLowInvalidHeapAttributeNumber;
if (!AttrNumberIsForUserDefinedAttr(attnum))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication target relation \"%s.%s\" uses "
"system columns in REPLICA IDENTITY index",
remoterel->nspname, remoterel->relname)));
attnum = AttrNumberGetAttrOffset(attnum);
if (entry->attrmap[attnum] < 0 ||
!bms_is_member(entry->attrmap[attnum], remoterel->attkeys))
{
entry->updatable = false;
break;
}
}
entry->localrelvalid = true;
}
if (entry->state != SUBREL_STATE_READY)
entry->state = GetSubscriptionRelState(MySubscription->oid,
entry->localreloid,
&entry->statelsn,
true);
return entry;
}
/*
* Close the previously opened logical relation.
*/
void
logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
{
heap_close(rel->localrel, lockmode);
rel->localrel = NULL;
}
/*
* Free the type map cache entry data.
*/
static void
logicalrep_typmap_free_entry(LogicalRepTyp *entry)
{
pfree(entry->nspname);
pfree(entry->typname);
}
/*
* Add new entry or update existing entry in the type map cache.
*/
void
logicalrep_typmap_update(LogicalRepTyp *remotetyp)
{
MemoryContext oldctx;
LogicalRepTyp *entry;
bool found;
if (LogicalRepTypMap == NULL)
logicalrep_relmap_init();
/*
* HASH_ENTER returns the existing entry if present or creates a new one.
*/
entry = hash_search(LogicalRepTypMap, (void *) &remotetyp->remoteid,
HASH_ENTER, &found);
if (found)
logicalrep_typmap_free_entry(entry);
/* Make cached copy of the data */
entry->remoteid = remotetyp->remoteid;
oldctx = MemoryContextSwitchTo(LogicalRepRelMapContext);
entry->nspname = pstrdup(remotetyp->nspname);
entry->typname = pstrdup(remotetyp->typname);
MemoryContextSwitchTo(oldctx);
}
/*
* Fetch type name from the cache by remote type OID.
*
* Return a substitute value if we cannot find the data type; no message is
* sent to the log in that case, because this is used by error callback
* already.
*/
char *
logicalrep_typmap_gettypname(Oid remoteid)
{
LogicalRepTyp *entry;
bool found;
/* Internal types are mapped directly. */
if (remoteid < FirstBootstrapObjectId)
{
if (!get_typisdefined(remoteid))
{
/*
* This can be caused by having a publisher with a higher
* PostgreSQL major version than the subscriber.
*/
return psprintf("unrecognized %u", remoteid);
}
return format_type_be(remoteid);
}
if (LogicalRepTypMap == NULL)
{
/*
* If the typemap is not initialized yet, we cannot possibly attempt
* to search the hash table; but there's no way we know the type
* locally yet, since we haven't received a message about this type,
* so this is the best we can do.
*/
return psprintf("unrecognized %u", remoteid);
}
/* search the mapping */
entry = hash_search(LogicalRepTypMap, (void *) &remoteid,
HASH_FIND, &found);
if (!found)
return psprintf("unrecognized %u", remoteid);
Assert(OidIsValid(entry->remoteid));
return psprintf("%s.%s", entry->nspname, entry->typname);
}