Add fast path for foreign key constraint checks

Add a fast-path optimization for foreign key checks that bypasses SPI
by directly probing the unique index on the referenced table.
Benchmarking shows ~1.8x speedup for bulk FK inserts (int PK/int FK,
1M rows, where PK table and index are cached).

The fast path applies when the referenced table is not partitioned and
the constraint does not involve temporal semantics.  Otherwise, the
existing SPI path is used.

This optimization covers only the referential check trigger
(RI_FKey_check).  The action triggers (CASCADE, SET NULL, SET DEFAULT,
RESTRICT, NO ACTION) must find rows on the FK side to modify, which
requires a table scan with no guaranteed index available, and then
execute DML against those rows through the full executor path including
any triggered actions.  Replicating that without substantial code
duplication is not feasible, so those triggers remain on the SPI path.
Extending the fast path to action triggers remains possible as future
work if the necessary infrastructure is built.

The new ri_FastPathCheck() function extracts the FK values, builds scan
keys, performs an index scan, and locks the matching tuple with
LockTupleKeyShare via ri_LockPKTuple(), which handles the RI-specific
subset of table_tuple_lock() results.

If the locked tuple was reached by chasing an update chain
(tmfd.traversed), recheck_matched_pk_tuple() verifies that the key
is still the same, emulating EvalPlanQual.

The scan uses GetTransactionSnapshot(), matching what the SPI path
uses (via _SPI_execute_plan pushing GetTransactionSnapshot() as the
active snapshot).  Under READ COMMITTED this is a fresh snapshot;
under REPEATABLE READ / SERIALIZABLE it is the frozen transaction-
start snapshot, so PK rows committed after the transaction started
are not visible.

The ri_CheckPermissions() function performs schema USAGE and table
SELECT checks, matching what the SPI path gets implicitly through
the executor's permission checks.  The fast path also switches to
the PK table owner's security context (with SECURITY_NOFORCE_RLS)
before the index probe, matching the SPI path where the query runs
as the table owner.

ri_HashCompareOp() is adjusted to handle cross-type equality operators
(e.g. int48eq for int4 PK / int8 FK) which can appear in conpfeqop.
The existing code asserted same-type operators only, which was correct
for its existing callers (ri_KeysEqual compares same-type FK column
values via ff_eq_oprs), but the fast path is the first caller to pass
pf_eq_oprs, which can be cross-type.

Per-key metadata (compare entries, operator procedures, strategy
numbers) is cached in RI_ConstraintInfo via
ri_populate_fastpath_metadata() on first use, eliminating repeated
calls to ri_HashCompareOp() and get_op_opfamily_properties().
conindid and pk_is_partitioned are also cached at constraint load
time, avoiding per-invocation syscache lookups and the need to open
pk_rel before deciding whether the fast path applies.

New regression tests cover RLS bypass and ACL enforcement for the
fast-path permission checks.  New isolation tests exercise concurrent
PK updates under both READ COMMITTED and REPEATABLE READ.

Author: Junwang Zhao <zhjwpku@gmail.com>
Co-authored-by: Amit Langote <amitlangote09@gmail.com>
Reviewed-by: Haibo Yan <tristan.yim@gmail.com>
Tested-by: Tomas Vondra <tomas@vondra.me>
Discussion: https://postgr.es/m/CA+HiwqF4C0ws3cO+z5cLkPuvwnAwkSp7sfvgGj3yQ=Li6KNMqA@mail.gmail.com
This commit is contained in:
Amit Langote 2026-03-31 13:49:21 +09:00
parent 5984ea868e
commit 2da86c1ef9
7 changed files with 723 additions and 14 deletions

View file

@ -24,12 +24,15 @@
#include "postgres.h"
#include "access/htup_details.h"
#include "access/skey.h"
#include "access/sysattr.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/index.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_constraint.h"
#include "catalog/pg_namespace.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/spi.h"
@ -91,6 +94,7 @@
#define RI_TRIGTYPE_UPDATE 2
#define RI_TRIGTYPE_DELETE 3
typedef struct FastPathMeta FastPathMeta;
/*
* RI_ConstraintInfo
@ -132,8 +136,24 @@ typedef struct RI_ConstraintInfo
Oid period_intersect_oper; /* anyrange * anyrange (or
* multiranges) */
dlist_node valid_link; /* Link in list of valid entries */
Oid conindid;
bool pk_is_partitioned;
FastPathMeta *fpmeta;
} RI_ConstraintInfo;
typedef struct RI_CompareHashEntry RI_CompareHashEntry;
/* Fast-path metadata for RI checks on foreign key referencing tables */
typedef struct FastPathMeta
{
RI_CompareHashEntry *compare_entries[RI_MAX_NUMKEYS];
RegProcedure regops[RI_MAX_NUMKEYS];
Oid subtypes[RI_MAX_NUMKEYS];
int strats[RI_MAX_NUMKEYS];
} FastPathMeta;
/*
* RI_QueryKey
*
@ -233,6 +253,23 @@ static bool ri_PerformCheck(const RI_ConstraintInfo *riinfo,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
bool is_restrict,
bool detectNewRows, int expect_OK);
static void ri_FastPathCheck(const RI_ConstraintInfo *riinfo,
Relation fk_rel, TupleTableSlot *newslot);
static bool ri_FastPathProbeOne(Relation pk_rel, Relation idx_rel,
IndexScanDesc scandesc, TupleTableSlot *slot,
Snapshot snapshot, const RI_ConstraintInfo *riinfo,
ScanKeyData *skey, int nkeys);
static bool ri_LockPKTuple(Relation pk_rel, TupleTableSlot *slot, Snapshot snap,
bool *concurrently_updated);
static bool ri_fastpath_is_applicable(const RI_ConstraintInfo *riinfo);
static void ri_CheckPermissions(Relation query_rel);
static bool recheck_matched_pk_tuple(Relation idxrel, ScanKeyData *skeys,
TupleTableSlot *new_slot);
static void build_index_scankeys(const RI_ConstraintInfo *riinfo,
Relation idx_rel, Datum *pk_vals,
char *pk_nulls, ScanKey skeys);
static void ri_populate_fastpath_metadata(RI_ConstraintInfo *riinfo,
Relation fk_rel, Relation idx_rel);
static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
const RI_ConstraintInfo *riinfo, bool rel_is_pk,
Datum *vals, char *nulls);
@ -276,14 +313,7 @@ RI_FKey_check(TriggerData *trigdata)
if (!table_tuple_satisfies_snapshot(trigdata->tg_relation, newslot, SnapshotSelf))
return PointerGetDatum(NULL);
/*
* Get the relation descriptors of the FK and PK tables.
*
* pk_rel is opened in RowShareLock mode since that's what our eventual
* SELECT FOR KEY SHARE will get on it.
*/
fk_rel = trigdata->tg_relation;
pk_rel = table_open(riinfo->pk_relid, RowShareLock);
switch (ri_NullCheck(RelationGetDescr(fk_rel), newslot, riinfo, false))
{
@ -293,7 +323,6 @@ RI_FKey_check(TriggerData *trigdata)
* No further check needed - an all-NULL key passes every type of
* foreign key constraint.
*/
table_close(pk_rel, RowShareLock);
return PointerGetDatum(NULL);
case RI_KEYS_SOME_NULL:
@ -318,7 +347,6 @@ RI_FKey_check(TriggerData *trigdata)
errdetail("MATCH FULL does not allow mixing of null and nonnull key values."),
errtableconstraint(fk_rel,
NameStr(riinfo->conname))));
table_close(pk_rel, RowShareLock);
return PointerGetDatum(NULL);
case FKCONSTR_MATCH_SIMPLE:
@ -327,7 +355,6 @@ RI_FKey_check(TriggerData *trigdata)
* MATCH SIMPLE - if ANY column is null, the key passes
* the constraint.
*/
table_close(pk_rel, RowShareLock);
return PointerGetDatum(NULL);
#ifdef NOT_USED
@ -352,8 +379,31 @@ RI_FKey_check(TriggerData *trigdata)
break;
}
/*
* Fast path: probe the PK unique index directly, bypassing SPI.
*
* For non-partitioned, non-temporal FKs, we can skip the SPI machinery
* (plan cache, executor setup, etc.) and do a direct index scan + tuple
* lock. This is semantically equivalent to the SPI path below but avoids
* the per-row executor overhead.
*
* ri_FastPathCheck() reports the violation itself (via ereport) if no
* matching PK row is found, so it only returns on success.
*/
if (ri_fastpath_is_applicable(riinfo))
{
ri_FastPathCheck(riinfo, fk_rel, newslot);
return PointerGetDatum(NULL);
}
SPI_connect();
/*
* pk_rel is opened in RowShareLock mode since that's what our eventual
* SELECT FOR KEY SHARE will get on it.
*/
pk_rel = table_open(riinfo->pk_relid, RowShareLock);
/* Fetch or prepare a saved plan for the real check */
ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
@ -2356,6 +2406,12 @@ ri_LoadConstraintInfo(Oid constraintOid)
riinfo->valid = true;
riinfo->conindid = conForm->conindid;
riinfo->pk_is_partitioned =
(get_rel_relkind(riinfo->pk_relid) == RELKIND_PARTITIONED_TABLE);
riinfo->fpmeta = NULL;
return riinfo;
}
@ -2623,6 +2679,379 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
return SPI_processed != 0;
}
/*
* ri_FastPathCheck
* Perform FK existence check via direct index probe, bypassing SPI.
*
* If no matching PK row exists, report the violation via ri_ReportViolation(),
* otherwise, the function returns normally.
*/
static void
ri_FastPathCheck(const RI_ConstraintInfo *riinfo,
Relation fk_rel, TupleTableSlot *newslot)
{
Relation pk_rel;
Relation idx_rel;
IndexScanDesc scandesc;
TupleTableSlot *slot;
Datum pk_vals[INDEX_MAX_KEYS];
char pk_nulls[INDEX_MAX_KEYS];
ScanKeyData skey[INDEX_MAX_KEYS];
bool found = false;
Oid saved_userid;
int saved_sec_context;
Snapshot snapshot;
/*
* Advance the command counter so the snapshot sees the effects of prior
* triggers in this statement. Mirrors what the SPI path does in
* ri_PerformCheck().
*/
CommandCounterIncrement();
snapshot = RegisterSnapshot(GetTransactionSnapshot());
pk_rel = table_open(riinfo->pk_relid, RowShareLock);
idx_rel = index_open(riinfo->conindid, AccessShareLock);
slot = table_slot_create(pk_rel, NULL);
scandesc = index_beginscan(pk_rel, idx_rel,
snapshot, NULL,
riinfo->nkeys, 0,
SO_NONE);
if (riinfo->fpmeta == NULL)
ri_populate_fastpath_metadata((RI_ConstraintInfo *) riinfo,
fk_rel, idx_rel);
Assert(riinfo->fpmeta);
GetUserIdAndSecContext(&saved_userid, &saved_sec_context);
SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
saved_sec_context |
SECURITY_LOCAL_USERID_CHANGE |
SECURITY_NOFORCE_RLS);
ri_CheckPermissions(pk_rel);
ri_ExtractValues(fk_rel, newslot, riinfo, false, pk_vals, pk_nulls);
build_index_scankeys(riinfo, idx_rel, pk_vals, pk_nulls, skey);
found = ri_FastPathProbeOne(pk_rel, idx_rel, scandesc, slot,
snapshot, riinfo, skey, riinfo->nkeys);
SetUserIdAndSecContext(saved_userid, saved_sec_context);
index_endscan(scandesc);
ExecDropSingleTupleTableSlot(slot);
UnregisterSnapshot(snapshot);
if (!found)
ri_ReportViolation(riinfo, pk_rel, fk_rel,
newslot, NULL,
RI_PLAN_CHECK_LOOKUPPK, false, false);
index_close(idx_rel, NoLock);
table_close(pk_rel, NoLock);
}
/*
* ri_FastPathProbeOne
* Probe the PK index for one set of scan keys, lock the matching
* tuple
*
* Returns true if a matching PK row was found, locked, and (if
* applicable) visible to the transaction snapshot.
*/
static bool
ri_FastPathProbeOne(Relation pk_rel, Relation idx_rel,
IndexScanDesc scandesc, TupleTableSlot *slot,
Snapshot snapshot, const RI_ConstraintInfo *riinfo,
ScanKeyData *skey, int nkeys)
{
bool found = false;
index_rescan(scandesc, skey, nkeys, NULL, 0);
if (index_getnext_slot(scandesc, ForwardScanDirection, slot))
{
bool concurrently_updated;
if (ri_LockPKTuple(pk_rel, slot, snapshot,
&concurrently_updated))
{
if (concurrently_updated)
found = recheck_matched_pk_tuple(idx_rel, skey, slot);
else
found = true;
}
}
return found;
}
/*
* ri_LockPKTuple
* Lock a PK tuple found by the fast-path index scan.
*
* Calls table_tuple_lock() directly with handling specific to RI checks.
* Returns true if the tuple was successfully locked.
*
* Sets *concurrently_updated to true if the locked tuple was reached
* by following an update chain (tmfd.traversed), indicating the caller
* should recheck the key.
*/
static bool
ri_LockPKTuple(Relation pk_rel, TupleTableSlot *slot, Snapshot snap,
bool *concurrently_updated)
{
TM_FailureData tmfd;
TM_Result result;
int lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
*concurrently_updated = false;
if (!IsolationUsesXactSnapshot())
lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
result = table_tuple_lock(pk_rel, &slot->tts_tid, snap,
slot, GetCurrentCommandId(false),
LockTupleKeyShare, LockWaitBlock,
lockflags, &tmfd);
switch (result)
{
case TM_Ok:
if (tmfd.traversed)
*concurrently_updated = true;
return true;
case TM_Deleted:
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
return false;
case TM_Updated:
if (IsolationUsesXactSnapshot())
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
/*
* In READ COMMITTED, FIND_LAST_VERSION should have chased the
* chain and returned TM_Ok. Getting here means something
* unexpected -- fall through to error.
*/
elog(ERROR, "unexpected table_tuple_lock status: %u", result);
break;
case TM_SelfModified:
/*
* The current command or a later command in this transaction
* modified the PK row. This shouldn't normally happen during an
* FK check (we're not modifying pk_rel), but handle it safely by
* treating the tuple as not found.
*/
return false;
case TM_Invisible:
elog(ERROR, "attempted to lock invisible tuple");
break;
default:
elog(ERROR, "unrecognized table_tuple_lock status: %u", result);
break;
}
return false; /* keep compiler quiet */
}
static bool
ri_fastpath_is_applicable(const RI_ConstraintInfo *riinfo)
{
/*
* Partitioned referenced tables are skipped for simplicity, since they
* require routing the probe through the correct partition using
* PartitionDirectory.
*/
if (riinfo->pk_is_partitioned)
return false;
/*
* Temporal foreign keys use range overlap and containment semantics (&&,
* <@, range_agg()) that inherently involve aggregation and multiple-row
* reasoning, so they stay on the SPI path.
*/
if (riinfo->hasperiod)
return false;
return true;
}
/*
* ri_CheckPermissions
* Check that the current user has permissions to look into the schema of
* and SELECT from 'query_rel'
*/
static void
ri_CheckPermissions(Relation query_rel)
{
AclResult aclresult;
/* USAGE on schema. */
aclresult = object_aclcheck(NamespaceRelationId,
RelationGetNamespace(query_rel),
GetUserId(), ACL_USAGE);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, OBJECT_SCHEMA,
get_namespace_name(RelationGetNamespace(query_rel)));
/* SELECT on relation. */
aclresult = pg_class_aclcheck(RelationGetRelid(query_rel), GetUserId(),
ACL_SELECT);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, OBJECT_TABLE,
RelationGetRelationName(query_rel));
}
/*
* recheck_matched_pk_tuple
* After following an update chain (tmfd.traversed), verify that
* the locked PK tuple still matches the original search keys.
*
* A non-key update (e.g. changing a non-PK column) creates a new tuple version
* that we've now locked, but the key is unchanged -- that's fine. A key
* update means the value we were looking for is gone, so we should treat it as
* not found.
*/
static bool
recheck_matched_pk_tuple(Relation idxrel, ScanKeyData *skeys,
TupleTableSlot *new_slot)
{
/*
* TODO: BuildIndexInfo does a syscache lookup + palloc on every call.
* This only fires on the concurrent-update path (tmfd.traversed), which
* should be rare, so the cost is acceptable for now. If profiling shows
* otherwise, cache the IndexInfo in FastPathMeta.
*/
IndexInfo *indexInfo = BuildIndexInfo(idxrel);
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
bool matched = true;
/* PK indexes never have these. */
Assert(indexInfo->ii_Expressions == NIL &&
indexInfo->ii_ExclusionOps == NULL);
/* Form the index values and isnull flags given the table tuple. */
FormIndexDatum(indexInfo, new_slot, NULL, values, isnull);
for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
{
ScanKeyData *skey = &skeys[i];
/* A PK column can never be set to NULL. */
Assert(!isnull[i]);
if (!DatumGetBool(FunctionCall2Coll(&skey->sk_func,
skey->sk_collation,
values[i],
skey->sk_argument)))
{
matched = false;
break;
}
}
return matched;
}
/*
* build_index_scankeys
* Build ScanKeys for a direct index probe of the PK's unique index.
*
* Uses cached compare entries, operator procedures, and strategy numbers
* from ri_populate_fastpath_metadata() rather than looking them up on
* each invocation. Casts FK values to the operator's expected input
* type if needed.
*/
static void
build_index_scankeys(const RI_ConstraintInfo *riinfo,
Relation idx_rel, Datum *pk_vals,
char *pk_nulls, ScanKey skeys)
{
FastPathMeta *fpmeta = riinfo->fpmeta;
Assert(fpmeta);
/*
* May need to cast each of the individual values of the foreign key to
* the corresponding PK column's type if the equality operator demands it.
*/
for (int i = 0; i < riinfo->nkeys; i++)
{
if (pk_nulls[i] != 'n')
{
RI_CompareHashEntry *entry = fpmeta->compare_entries[i];
if (OidIsValid(entry->cast_func_finfo.fn_oid))
pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
pk_vals[i],
Int32GetDatum(-1), /* typmod */
BoolGetDatum(false)); /* implicit coercion */
}
}
/*
* Set up ScanKeys for the index scan. This is essentially how
* ExecIndexBuildScanKeys() sets them up.
*/
for (int i = 0; i < riinfo->nkeys; i++)
{
int pkattrno = i + 1;
ScanKeyEntryInitialize(&skeys[i], 0, pkattrno,
fpmeta->strats[i], fpmeta->subtypes[i],
idx_rel->rd_indcollation[i], fpmeta->regops[i],
pk_vals[i]);
}
}
/*
* ri_populate_fastpath_metadata
* Cache per-key metadata needed by build_index_scankeys().
*
* Looks up the compare hash entry, operator procedure OID, and index
* strategy/subtype for each key column. Called lazily on first use
* and persists for the lifetime of the RI_ConstraintInfo entry.
*/
static void
ri_populate_fastpath_metadata(RI_ConstraintInfo *riinfo,
Relation fk_rel, Relation idx_rel)
{
FastPathMeta *fpmeta;
MemoryContext oldcxt = MemoryContextSwitchTo(TopMemoryContext);
Assert(riinfo != NULL && riinfo->valid);
fpmeta = palloc_object(FastPathMeta);
for (int i = 0; i < riinfo->nkeys; i++)
{
Oid eq_opr = riinfo->pf_eq_oprs[i];
Oid typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
Oid lefttype;
RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
fpmeta->compare_entries[i] = entry;
fpmeta->regops[i] = get_opcode(eq_opr);
get_op_opfamily_properties(eq_opr,
idx_rel->rd_opfamily[i],
false,
&fpmeta->strats[i],
&lefttype,
&fpmeta->subtypes[i]);
}
riinfo->fpmeta = fpmeta;
MemoryContextSwitchTo(oldcxt);
}
/*
* Extract fields from a tuple into Datum/nulls arrays
*/
@ -3118,8 +3547,11 @@ ri_CompareWithCast(Oid eq_opr, Oid typeid, Oid collid,
/*
* ri_HashCompareOp -
*
* See if we know how to compare two values, and create a new hash entry
* if not.
* Look up or create a cache entry for the given equality operator and
* the caller's value type (typeid). The entry holds the operator's
* FmgrInfo and, if typeid doesn't match what the operator expects as
* its right-hand input, a cast function to coerce the value before
* comparison.
*/
static RI_CompareHashEntry *
ri_HashCompareOp(Oid eq_opr, Oid typeid)
@ -3175,8 +3607,14 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
* moment since that will never be generated for implicit coercions.
*/
op_input_types(eq_opr, &lefttype, &righttype);
Assert(lefttype == righttype);
if (typeid == lefttype)
/*
* pf_eq_oprs (used by the fast path) can be cross-type when the FK
* and PK columns differ in type, e.g. int48eq for int4 PK / int8 FK.
* If the FK column's type already matches what the operator expects
* as its right-hand input, no cast is needed.
*/
if (typeid == righttype)
castfunc = InvalidOid; /* simplest case */
else
{

View file

@ -0,0 +1,105 @@
Parsed test spec with 3 sessions
starting permutation: s2b s2ukey s1b s1i s2c s1c s2s s1s
step s2b: BEGIN;
step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
step s1b: BEGIN;
step s1i: INSERT INTO child VALUES (1, 1); <waiting ...>
step s2c: COMMIT;
step s1i: <... completed>
ERROR: insert or update on table "child" violates foreign key constraint "child_parent_key_fkey"
step s1c: COMMIT;
step s2s: SELECT * FROM parent;
parent_key|aux
----------+---
2|foo
(1 row)
step s1s: SELECT * FROM child;
child_key|parent_key
---------+----------
(0 rows)
starting permutation: s2b s2uaux s1b s1i s2c s1c s2s s1s
step s2b: BEGIN;
step s2uaux: UPDATE parent SET aux = 'bar' WHERE parent_key = 1;
step s1b: BEGIN;
step s1i: INSERT INTO child VALUES (1, 1);
step s2c: COMMIT;
step s1c: COMMIT;
step s2s: SELECT * FROM parent;
parent_key|aux
----------+---
1|bar
(1 row)
step s1s: SELECT * FROM child;
child_key|parent_key
---------+----------
1| 1
(1 row)
starting permutation: s2b s2ukey s1b s1i s2ukey2 s2c s1c s2s s1s
step s2b: BEGIN;
step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
step s1b: BEGIN;
step s1i: INSERT INTO child VALUES (1, 1); <waiting ...>
step s2ukey2: UPDATE parent SET parent_key = 1 WHERE parent_key = 2;
step s2c: COMMIT;
step s1i: <... completed>
step s1c: COMMIT;
step s2s: SELECT * FROM parent;
parent_key|aux
----------+---
1|foo
(1 row)
step s1s: SELECT * FROM child;
child_key|parent_key
---------+----------
1| 1
(1 row)
starting permutation: s2b s2ukey s3b s3i s2c s3c s2s s3s
step s2b: BEGIN;
step s2ukey: UPDATE parent SET parent_key = 2 WHERE parent_key = 1;
step s3b: BEGIN ISOLATION LEVEL REPEATABLE READ;
step s3i: INSERT INTO child VALUES (2, 1); <waiting ...>
step s2c: COMMIT;
step s3i: <... completed>
ERROR: could not serialize access due to concurrent update
step s3c: COMMIT;
step s2s: SELECT * FROM parent;
parent_key|aux
----------+---
2|foo
(1 row)
step s3s: SELECT * FROM child;
child_key|parent_key
---------+----------
(0 rows)
starting permutation: s2b s2uaux s3b s3i s2c s3c s2s s3s
step s2b: BEGIN;
step s2uaux: UPDATE parent SET aux = 'bar' WHERE parent_key = 1;
step s3b: BEGIN ISOLATION LEVEL REPEATABLE READ;
step s3i: INSERT INTO child VALUES (2, 1);
step s2c: COMMIT;
step s3c: COMMIT;
step s2s: SELECT * FROM parent;
parent_key|aux
----------+---
1|bar
(1 row)
step s3s: SELECT * FROM child;
child_key|parent_key
---------+----------
2| 1
(1 row)

View file

@ -37,6 +37,7 @@ test: fk-partitioned-2
test: fk-snapshot
test: fk-snapshot-2
test: fk-snapshot-3
test: fk-concurrent-pk-upd
test: subxid-overflow
test: eval-plan-qual
test: eval-plan-qual-trigger

View file

@ -0,0 +1,53 @@
# Tests that an INSERT on referencing table correctly fails when
# the referenced value disappears due to a concurrent update
setup
{
CREATE TABLE parent (
parent_key int PRIMARY KEY,
aux text NOT NULL
);
CREATE TABLE child (
child_key int PRIMARY KEY,
parent_key int8 NOT NULL REFERENCES parent
);
INSERT INTO parent VALUES (1, 'foo');
}
teardown
{
DROP TABLE parent, child;
}
session s1
step s1b { BEGIN; }
step s1i { INSERT INTO child VALUES (1, 1); }
step s1c { COMMIT; }
step s1s { SELECT * FROM child; }
session s2
step s2b { BEGIN; }
step s2ukey { UPDATE parent SET parent_key = 2 WHERE parent_key = 1; }
step s2uaux { UPDATE parent SET aux = 'bar' WHERE parent_key = 1; }
step s2ukey2 { UPDATE parent SET parent_key = 1 WHERE parent_key = 2; }
step s2c { COMMIT; }
step s2s { SELECT * FROM parent; }
session s3
step s3b { BEGIN ISOLATION LEVEL REPEATABLE READ; }
step s3i { INSERT INTO child VALUES (2, 1); }
step s3c { COMMIT; }
step s3s { SELECT * FROM child; }
# fail
permutation s2b s2ukey s1b s1i s2c s1c s2s s1s
# ok
permutation s2b s2uaux s1b s1i s2c s1c s2s s1s
# ok
permutation s2b s2ukey s1b s1i s2ukey2 s2c s1c s2s s1s
# RR: key update -> serialization failure
permutation s2b s2ukey s3b s3i s2c s3c s2s s3s
# RR: non-key update -> old version visible via transaction snapshot
permutation s2b s2uaux s3b s3i s2c s3c s2s s3s

View file

@ -370,6 +370,53 @@ SELECT * FROM PKTABLE;
DROP TABLE FKTABLE;
DROP TABLE PKTABLE;
--
-- Check RLS
--
CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
-- Insert test data into PKTABLE
INSERT INTO PKTABLE VALUES (1, 'Test1');
INSERT INTO PKTABLE VALUES (2, 'Test2');
INSERT INTO PKTABLE VALUES (3, 'Test3');
-- Grant privileges on PKTABLE/FKTABLE to user regress_foreign_key_user
CREATE USER regress_foreign_key_user NOLOGIN;
GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
GRANT SELECT, INSERT ON FKTABLE TO regress_foreign_key_user;
-- Enable RLS on PKTABLE and Create policies
ALTER TABLE PKTABLE ENABLE ROW LEVEL SECURITY;
CREATE POLICY pktable_view_odd_policy ON PKTABLE TO regress_foreign_key_user USING (ptest1 % 2 = 1);
ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
SET ROLE regress_foreign_key_user;
INSERT INTO FKTABLE VALUES (3, 5);
INSERT INTO FKTABLE VALUES (2, 5); -- success, REFERENCES are not subject to row security
RESET ROLE;
DROP TABLE FKTABLE;
DROP TABLE PKTABLE;
DROP USER regress_foreign_key_user;
--
-- Check ACL
--
CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
-- Insert test data into PKTABLE
INSERT INTO PKTABLE VALUES (1, 'Test1');
INSERT INTO PKTABLE VALUES (2, 'Test2');
INSERT INTO PKTABLE VALUES (3, 'Test3');
-- Grant usage on PKTABLE to user regress_foreign_key_user
CREATE USER regress_foreign_key_user NOLOGIN;
GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
-- Inserting into FKTABLE should work
INSERT INTO FKTABLE VALUES (3, 5);
-- Revoke usage on PKTABLE from user regress_foreign_key_user
REVOKE SELECT ON PKTABLE FROM regress_foreign_key_user;
-- Inserting into FKTABLE should fail
INSERT INTO FKTABLE VALUES (2, 6);
ERROR: permission denied for table pktable
DROP TABLE FKTABLE;
DROP TABLE PKTABLE;
DROP USER regress_foreign_key_user;
--
-- Check initial check upon ALTER TABLE
--
CREATE TABLE PKTABLE ( ptest1 int, ptest2 int, PRIMARY KEY(ptest1, ptest2) );

View file

@ -242,6 +242,70 @@ SELECT * FROM PKTABLE;
DROP TABLE FKTABLE;
DROP TABLE PKTABLE;
--
-- Check RLS
--
CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
-- Insert test data into PKTABLE
INSERT INTO PKTABLE VALUES (1, 'Test1');
INSERT INTO PKTABLE VALUES (2, 'Test2');
INSERT INTO PKTABLE VALUES (3, 'Test3');
-- Grant privileges on PKTABLE/FKTABLE to user regress_foreign_key_user
CREATE USER regress_foreign_key_user NOLOGIN;
GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
GRANT SELECT, INSERT ON FKTABLE TO regress_foreign_key_user;
-- Enable RLS on PKTABLE and Create policies
ALTER TABLE PKTABLE ENABLE ROW LEVEL SECURITY;
CREATE POLICY pktable_view_odd_policy ON PKTABLE TO regress_foreign_key_user USING (ptest1 % 2 = 1);
ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
SET ROLE regress_foreign_key_user;
INSERT INTO FKTABLE VALUES (3, 5);
INSERT INTO FKTABLE VALUES (2, 5); -- success, REFERENCES are not subject to row security
RESET ROLE;
DROP TABLE FKTABLE;
DROP TABLE PKTABLE;
DROP USER regress_foreign_key_user;
--
-- Check ACL
--
CREATE TABLE PKTABLE ( ptest1 int PRIMARY KEY, ptest2 text );
CREATE TABLE FKTABLE ( ftest1 int REFERENCES PKTABLE, ftest2 int );
-- Insert test data into PKTABLE
INSERT INTO PKTABLE VALUES (1, 'Test1');
INSERT INTO PKTABLE VALUES (2, 'Test2');
INSERT INTO PKTABLE VALUES (3, 'Test3');
-- Grant usage on PKTABLE to user regress_foreign_key_user
CREATE USER regress_foreign_key_user NOLOGIN;
GRANT SELECT ON PKTABLE TO regress_foreign_key_user;
ALTER TABLE PKTABLE OWNER to regress_foreign_key_user;
-- Inserting into FKTABLE should work
INSERT INTO FKTABLE VALUES (3, 5);
-- Revoke usage on PKTABLE from user regress_foreign_key_user
REVOKE SELECT ON PKTABLE FROM regress_foreign_key_user;
-- Inserting into FKTABLE should fail
INSERT INTO FKTABLE VALUES (2, 6);
DROP TABLE FKTABLE;
DROP TABLE PKTABLE;
DROP USER regress_foreign_key_user;
--
-- Check initial check upon ALTER TABLE
--

View file

@ -817,6 +817,7 @@ ExtensionInfo
ExtensionLocation
ExtensionSiblingCache
ExtensionVersionInfo
FastPathMeta
FDWCollateState
FD_SET
FILE