Add callback for I/O error messages in SLRUs

Historically, all SLRUs were addressed by transaction IDs, but that
hasn't been true for a long time. However, the error message on I/O
error still always talked about accessing a transaction ID.

This commit adds a callback that allows subsystems to construct their
own error messages, which can then correctly refer to a transaction
ID, multixid or whatever else is used to address the particular SLRU.

Author: Maxim Orlov <orlovmg@gmail.com>
Reviewed-by: Álvaro Herrera <alvherre@kurilemu.de>
Discussion: https://www.postgresql.org/message-id/CACG=ezZZfurhYV+66ceubxQAyWqv9vaUi0yoO4-t48OE5xc0DQ@mail.gmail.com
This commit is contained in:
Heikki Linnakangas 2026-03-13 16:21:06 +02:00
parent 723619eaa3
commit f9de9bf302
13 changed files with 187 additions and 75 deletions

View file

@ -112,6 +112,7 @@ static SlruCtlData XactCtlData;
static bool CLOGPagePrecedes(int64 page1, int64 page2);
static int clog_errdetail_for_io_error(const void *opaque_data);
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXact,
Oid oldestXactDb);
static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
@ -382,8 +383,7 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
* write-busy, since we don't care if the update reaches disk sooner than
* we think.
*/
slotno = SimpleLruReadPage(XactCtl, pageno, !XLogRecPtrIsValid(lsn),
xid);
slotno = SimpleLruReadPage(XactCtl, pageno, !XLogRecPtrIsValid(lsn), &xid);
/*
* Set the main transaction id, if any.
@ -744,7 +744,7 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
/* lock is acquired by SimpleLruReadPage_ReadOnly */
slotno = SimpleLruReadPage_ReadOnly(XactCtl, pageno, xid);
slotno = SimpleLruReadPage_ReadOnly(XactCtl, pageno, &xid);
byteptr = XactCtl->shared->page_buffer[slotno] + byteno;
status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
@ -808,6 +808,7 @@ CLOGShmemInit(void)
Assert(transaction_buffers != 0);
XactCtl->PagePrecedes = CLOGPagePrecedes;
XactCtl->errdetail_for_io_error = clog_errdetail_for_io_error;
SimpleLruInit(XactCtl, "transaction", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
"pg_xact", LWTRANCHE_XACT_BUFFER,
LWTRANCHE_XACT_SLRU, SYNC_HANDLER_CLOG, false);
@ -883,7 +884,7 @@ TrimCLOG(void)
int slotno;
char *byteptr;
slotno = SimpleLruReadPage(XactCtl, pageno, false, xid);
slotno = SimpleLruReadPage(XactCtl, pageno, false, &xid);
byteptr = XactCtl->shared->page_buffer[slotno] + byteno;
/* Zero so-far-unused positions in the current byte */
@ -1034,6 +1035,14 @@ CLOGPagePrecedes(int64 page1, int64 page2)
TransactionIdPrecedes(xid1, xid2 + CLOG_XACTS_PER_PAGE - 1));
}
static int
clog_errdetail_for_io_error(const void *opaque_data)
{
TransactionId xid = *(const TransactionId *) opaque_data;
return errdetail("Could not access commit status of transaction %u.", xid);
}
/*
* Write a TRUNCATE xlog record

View file

@ -115,6 +115,7 @@ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
ReplOriginId nodeid, int slotno);
static void error_commit_ts_disabled(void);
static bool CommitTsPagePrecedes(int64 page1, int64 page2);
static int commit_ts_errdetail_for_io_error(const void *opaque_data);
static void ActivateCommitTs(void);
static void DeactivateCommitTs(void);
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
@ -227,7 +228,7 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
for (i = 0; i < nsubxids; i++)
@ -332,7 +333,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
}
/* lock is acquired by SimpleLruReadPage_ReadOnly */
slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, &xid);
memcpy(&entry,
CommitTsCtl->shared->page_buffer[slotno] +
SizeOfCommitTimestampEntry * entryno,
@ -551,6 +552,7 @@ CommitTsShmemInit(void)
Assert(commit_timestamp_buffers != 0);
CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
CommitTsCtl->errdetail_for_io_error = commit_ts_errdetail_for_io_error;
SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
"pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
LWTRANCHE_COMMITTS_SLRU,
@ -959,6 +961,13 @@ CommitTsPagePrecedes(int64 page1, int64 page2)
TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
}
static int
commit_ts_errdetail_for_io_error(const void *opaque_data)
{
TransactionId xid = *(const TransactionId *) opaque_data;
return errdetail("Could not access commit timestamp of transaction %u.", xid);
}
/*
* Write a TRUNCATE xlog record

View file

@ -313,8 +313,18 @@ static void mXactCachePut(MultiXactId multi, int nmembers,
MultiXactMember *members);
/* management of SLRU infrastructure */
/* opaque_data type for MultiXactMemberIoErrorDetail */
typedef struct MultiXactMemberSlruReadContext
{
MultiXactId multi;
MultiXactOffset offset;
} MultiXactMemberSlruReadContext;
static bool MultiXactOffsetPagePrecedes(int64 page1, int64 page2);
static bool MultiXactMemberPagePrecedes(int64 page1, int64 page2);
static int MultiXactOffsetIoErrorDetail(const void *opaque_data);
static int MultiXactMemberIoErrorDetail(const void *opaque_data);
static void ExtendMultiXactOffset(MultiXactId multi);
static void ExtendMultiXactMember(MultiXactOffset offset, int nmembers);
static void SetOldestOffset(void);
@ -829,14 +839,7 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
/*
* Note: we pass the MultiXactId to SimpleLruReadPage as the "transaction"
* to complain about if there's any I/O error. This is kinda bogus, but
* since the errors will always give the full pathname, it should be clear
* enough that a MultiXactId is really involved. Perhaps someday we'll
* take the trouble to generalize the slru.c error reporting code.
*/
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi);
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, &multi);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
@ -865,7 +868,7 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, next_pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(MultiXactOffsetCtl, next_pageno, true, next);
slotno = SimpleLruReadPage(MultiXactOffsetCtl, next_pageno, true, &next);
next_offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
next_offptr += next_entryno;
}
@ -905,6 +908,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
if (pageno != prev_pageno)
{
MultiXactMemberSlruReadContext slru_read_context = {multi, offset};
/*
* MultiXactMember SLRU page is changed so check if this new page
* fall into the different SLRU bank then release the old bank's
@ -919,7 +924,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
LWLockAcquire(lock, LW_EXCLUSIVE);
prevlock = lock;
}
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi);
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true,
&slru_read_context);
prev_pageno = pageno;
}
@ -1244,7 +1250,7 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
LWLockAcquire(lock, LW_EXCLUSIVE);
/* read this multi's offset */
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi);
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, &multi);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
offset = *offptr;
@ -1282,7 +1288,7 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
LWLockAcquire(newlock, LW_EXCLUSIVE);
lock = newlock;
}
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, tmpMXact);
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, &tmpMXact);
}
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
@ -1331,6 +1337,7 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
if (pageno != prev_pageno)
{
MultiXactMemberSlruReadContext slru_read_context = {multi, offset};
LWLock *newlock;
/*
@ -1346,8 +1353,8 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
LWLockAcquire(newlock, LW_EXCLUSIVE);
lock = newlock;
}
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi);
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true,
&slru_read_context);
prev_pageno = pageno;
}
@ -1778,6 +1785,8 @@ MultiXactShmemInit(void)
MultiXactOffsetCtl->PagePrecedes = MultiXactOffsetPagePrecedes;
MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes;
MultiXactOffsetCtl->errdetail_for_io_error = MultiXactOffsetIoErrorDetail;
MultiXactMemberCtl->errdetail_for_io_error = MultiXactMemberIoErrorDetail;
SimpleLruInit(MultiXactOffsetCtl,
"multixact_offset", multixact_offset_buffers, 0,
@ -1928,7 +1937,7 @@ TrimMultiXact(void)
if (entryno == 0 || nextMXact == FirstMultiXactId)
slotno = SimpleLruZeroPage(MultiXactOffsetCtl, pageno);
else
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, nextMXact);
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, &nextMXact);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
@ -1956,6 +1965,7 @@ TrimMultiXact(void)
flagsoff = MXOffsetToFlagsOffset(offset);
if (flagsoff != 0)
{
MultiXactMemberSlruReadContext slru_read_context = {InvalidMultiXactId, offset};
int slotno;
TransactionId *xidptr;
int memberoff;
@ -1963,7 +1973,7 @@ TrimMultiXact(void)
LWLockAcquire(lock, LW_EXCLUSIVE);
memberoff = MXOffsetToMemberOffset(offset);
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, offset);
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, &slru_read_context);
xidptr = (TransactionId *)
(MultiXactMemberCtl->shared->page_buffer[slotno] + memberoff);
@ -2497,7 +2507,7 @@ find_multixact_start(MultiXactId multi, MultiXactOffset *result)
return false;
/* lock is acquired by SimpleLruReadPage_ReadOnly */
slotno = SimpleLruReadPage_ReadOnly(MultiXactOffsetCtl, pageno, multi);
slotno = SimpleLruReadPage_ReadOnly(MultiXactOffsetCtl, pageno, &multi);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
offset = *offptr;
@ -2808,6 +2818,27 @@ MultiXactMemberPagePrecedes(int64 page1, int64 page2)
return page1 < page2;
}
static int
MultiXactOffsetIoErrorDetail(const void *opaque_data)
{
MultiXactId multixid = *(const MultiXactId *) opaque_data;
return errdetail("Could not access offset of multixact %u.", multixid);
}
static int
MultiXactMemberIoErrorDetail(const void *opaque_data)
{
const MultiXactMemberSlruReadContext *context = opaque_data;
if (MultiXactIdIsValid(context->multi))
return errdetail("Could not access member of multixact %u at offset %" PRIu64 ".",
context->multi, context->offset);
else
return errdetail("Could not access multixact member at offset %" PRIu64 ".",
context->offset);
}
/*
* Decide which of two MultiXactIds is earlier.
*

View file

@ -182,7 +182,8 @@ static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata);
static bool SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno);
static bool SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno,
SlruWriteAll fdata);
static void SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid);
static void SlruReportIOError(SlruCtl ctl, int64 pageno,
const void *opaque_data);
static int SlruSelectLRUPage(SlruCtl ctl, int64 pageno);
static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
@ -260,6 +261,9 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
Assert(nslots <= SLRU_MAX_ALLOWED_BUFFERS);
Assert(ctl->PagePrecedes != NULL);
Assert(ctl->errdetail_for_io_error != NULL);
shared = (SlruShared) ShmemInitStruct(name,
SimpleLruShmemSize(nslots, nlsns),
&found);
@ -516,8 +520,9 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
* that modification of the page is safe. If write_ok is false then we
* will not return the page until it is not undergoing active I/O.
*
* The passed-in xid is used only for error reporting, and may be
* InvalidTransactionId if no specific xid is associated with the action.
* On error, the passed-in 'opaque_data' is passed to the
* 'errdetail_for_io_error' callback, to provide details on the operation that
* failed. It is only used for error reporting.
*
* Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated.
@ -526,7 +531,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
*/
int
SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
TransactionId xid)
const void *opaque_data)
{
SlruShared shared = ctl->shared;
LWLock *banklock = SimpleLruGetBankLock(ctl, pageno);
@ -602,7 +607,7 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
/* Now it's okay to ereport if we failed */
if (!ok)
SlruReportIOError(ctl, pageno, xid);
SlruReportIOError(ctl, pageno, opaque_data);
SlruRecentlyUsed(shared, slotno);
@ -618,8 +623,9 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
* The page number must correspond to an already-initialized page.
* The caller must intend only read-only access to the page.
*
* The passed-in xid is used only for error reporting, and may be
* InvalidTransactionId if no specific xid is associated with the action.
* On error, the passed-in 'opaque_data' is passed to the
* 'errdetail_for_io_error' callback, to provide details on the operation that
* failed. It is only used for error reporting.
*
* Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated.
@ -628,7 +634,7 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
* It is unspecified whether the lock will be shared or exclusive.
*/
int
SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, const void *opaque_data)
{
SlruShared shared = ctl->shared;
LWLock *banklock = SimpleLruGetBankLock(ctl, pageno);
@ -660,7 +666,7 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
LWLockRelease(banklock);
LWLockAcquire(banklock, LW_EXCLUSIVE);
return SimpleLruReadPage(ctl, pageno, true, xid);
return SimpleLruReadPage(ctl, pageno, true, opaque_data);
}
/*
@ -740,7 +746,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
/* Now it's okay to ereport if we failed */
if (!ok)
SlruReportIOError(ctl, pageno, InvalidTransactionId);
SlruReportIOError(ctl, pageno, NULL);
/* If part of a checkpoint, count this as a SLRU buffer written. */
if (fdata)
@ -794,14 +800,14 @@ SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
/* report error normally */
slru_errcause = SLRU_OPEN_FAILED;
slru_errno = errno;
SlruReportIOError(ctl, pageno, 0);
SlruReportIOError(ctl, pageno, NULL);
}
if ((endpos = lseek(fd, 0, SEEK_END)) < 0)
{
slru_errcause = SLRU_SEEK_FAILED;
slru_errno = errno;
SlruReportIOError(ctl, pageno, 0);
SlruReportIOError(ctl, pageno, NULL);
}
result = endpos >= (off_t) (offset + BLCKSZ);
@ -1071,7 +1077,7 @@ SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno, SlruWriteAll fdata)
* SlruPhysicalWritePage. Call this after cleaning up shared-memory state.
*/
static void
SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid)
SlruReportIOError(SlruCtl ctl, int64 pageno, const void *opaque_data)
{
int64 segno = pageno / SLRU_PAGES_PER_SEGMENT;
int rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
@ -1085,54 +1091,55 @@ SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid)
case SLRU_OPEN_FAILED:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not access status of transaction %u", xid),
errdetail("Could not open file \"%s\": %m.", path)));
errmsg("could not open file \"%s\": %m", path),
opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
case SLRU_SEEK_FAILED:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not access status of transaction %u", xid),
errdetail("Could not seek in file \"%s\" to offset %d: %m.",
path, offset)));
errmsg("could not seek in file \"%s\" to offset %d: %m",
path, offset),
opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
case SLRU_READ_FAILED:
if (errno)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not access status of transaction %u", xid),
errdetail("Could not read from file \"%s\" at offset %d: %m.",
path, offset)));
errmsg("could not read from file \"%s\" at offset %d: %m",
path, offset),
opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
else
ereport(ERROR,
(errmsg("could not access status of transaction %u", xid),
errdetail("Could not read from file \"%s\" at offset %d: read too few bytes.", path, offset)));
(errmsg("could not read from file \"%s\" at offset %d: read too few bytes",
path, offset),
opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
case SLRU_WRITE_FAILED:
if (errno)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not access status of transaction %u", xid),
errdetail("Could not write to file \"%s\" at offset %d: %m.",
path, offset)));
errmsg("Could not write to file \"%s\" at offset %d: %m",
path, offset),
opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
else
ereport(ERROR,
(errmsg("could not access status of transaction %u", xid),
errdetail("Could not write to file \"%s\" at offset %d: wrote too few bytes.",
path, offset)));
(errmsg("Could not write to file \"%s\" at offset %d: wrote too few bytes.",
path, offset),
opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
case SLRU_FSYNC_FAILED:
ereport(data_sync_elevel(ERROR),
(errcode_for_file_access(),
errmsg("could not access status of transaction %u", xid),
errdetail("Could not fsync file \"%s\": %m.",
path)));
errmsg("could not fsync file \"%s\": %m",
path),
opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
case SLRU_CLOSE_FAILED:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not access status of transaction %u", xid),
errdetail("Could not close file \"%s\": %m.",
path)));
errmsg("could not close file \"%s\": %m",
path),
opaque_data ? ctl->errdetail_for_io_error(opaque_data) : 0));
break;
default:
/* can't get here, we trust */
@ -1412,7 +1419,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
}
}
if (!ok)
SlruReportIOError(ctl, pageno, InvalidTransactionId);
SlruReportIOError(ctl, pageno, NULL);
/* Ensure that directory entries for new files are on disk. */
if (ctl->sync_handler != SYNC_HANDLER_NONE)

View file

@ -75,6 +75,7 @@ static SlruCtlData SubTransCtlData;
static bool SubTransPagePrecedes(int64 page1, int64 page2);
static int subtrans_errdetail_for_io_error(const void *opaque_data);
/*
@ -95,7 +96,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
lock = SimpleLruGetBankLock(SubTransCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid);
slotno = SimpleLruReadPage(SubTransCtl, pageno, true, &xid);
ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
ptr += entryno;
@ -135,7 +136,7 @@ SubTransGetParent(TransactionId xid)
/* lock is acquired by SimpleLruReadPage_ReadOnly */
slotno = SimpleLruReadPage_ReadOnly(SubTransCtl, pageno, xid);
slotno = SimpleLruReadPage_ReadOnly(SubTransCtl, pageno, &xid);
ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
ptr += entryno;
@ -240,6 +241,7 @@ SUBTRANSShmemInit(void)
Assert(subtransaction_buffers != 0);
SubTransCtl->PagePrecedes = SubTransPagePrecedes;
SubTransCtl->errdetail_for_io_error = subtrans_errdetail_for_io_error;
SimpleLruInit(SubTransCtl, "subtransaction", SUBTRANSShmemBuffers(), 0,
"pg_subtrans", LWTRANCHE_SUBTRANS_BUFFER,
LWTRANCHE_SUBTRANS_SLRU, SYNC_HANDLER_NONE, false);
@ -419,3 +421,11 @@ SubTransPagePrecedes(int64 page1, int64 page2)
return (TransactionIdPrecedes(xid1, xid2) &&
TransactionIdPrecedes(xid1, xid2 + SUBTRANS_XACTS_PER_PAGE - 1));
}
static int
subtrans_errdetail_for_io_error(const void *opaque_data)
{
TransactionId xid = *(const TransactionId *) opaque_data;
return errdetail("Could not access subtransaction status of transaction %u.", xid);
}

View file

@ -570,6 +570,7 @@ bool Trace_notify = false;
int max_notify_queue_pages = 1048576;
/* local function prototypes */
static int asyncQueueErrdetailForIoError(const void *opaque_data);
static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
static inline void GlobalChannelKeyInit(GlobalChannelKey *key, Oid dboid,
@ -610,6 +611,15 @@ static uint32 notification_hash(const void *key, Size keysize);
static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
static int
asyncQueueErrdetailForIoError(const void *opaque_data)
{
const QueuePosition *position = opaque_data;
return errdetail("Could not access async queue at page %" PRId64 ", offset %d.",
position->page, position->offset);
}
/*
* Compute the difference between two queue page numbers.
* Previously this function accounted for a wraparound.
@ -830,6 +840,7 @@ AsyncShmemInit(void)
* names are used in order to avoid wraparound.
*/
NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
NotifyCtl->errdetail_for_io_error = asyncQueueErrdetailForIoError;
SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
"pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,
SYNC_HANDLER_NONE, true);
@ -2068,8 +2079,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
if (QUEUE_POS_IS_ZERO(queue_head))
slotno = SimpleLruZeroPage(NotifyCtl, pageno);
else
slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
InvalidTransactionId);
slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &queue_head);
/* Note we mark the page dirty before writing in it */
NotifyCtl->shared->page_dirty[slotno] = true;
@ -2739,8 +2749,7 @@ asyncQueueProcessPageEntries(QueuePosition *current,
alignas(AsyncQueueEntry) char local_buf[QUEUE_PAGESIZE];
char *local_buf_end = local_buf;
slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
InvalidTransactionId);
slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, current);
page_buffer = NotifyCtl->shared->page_buffer[slotno];
do
@ -2998,8 +3007,7 @@ AsyncNotifyFreezeXids(TransactionId newFrozenXid)
lock = SimpleLruGetBankLock(NotifyCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(NotifyCtl, pageno, true,
InvalidTransactionId);
slotno = SimpleLruReadPage(NotifyCtl, pageno, true, &pos);
page_buffer = NotifyCtl->shared->page_buffer[slotno];
curpage = pageno;
}

View file

@ -443,6 +443,7 @@ static void ReleaseRWConflict(RWConflict conflict);
static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
static bool SerialPagePrecedesLogically(int64 page1, int64 page2);
static int serial_errdetail_for_io_error(const void *opaque_data);
static void SerialInit(void);
static void SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
static SerCommitSeqNo SerialGetMinConflictCommitSeqNo(TransactionId xid);
@ -743,6 +744,14 @@ SerialPagePrecedesLogically(int64 page1, int64 page2)
TransactionIdPrecedes(xid1, xid2 + SERIAL_ENTRIESPERPAGE - 1));
}
static int
serial_errdetail_for_io_error(const void *opaque_data)
{
TransactionId xid = *(const TransactionId *) opaque_data;
return errdetail("Could not access serializable CSN of transaction %u.", xid);
}
#ifdef USE_ASSERT_CHECKING
static void
SerialPagePrecedesLogicallyUnitTests(void)
@ -812,6 +821,7 @@ SerialInit(void)
* Set up SLRU management of the pg_serial data.
*/
SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically;
SerialSlruCtl->errdetail_for_io_error = serial_errdetail_for_io_error;
SimpleLruInit(SerialSlruCtl, "serializable",
serializable_buffers, 0, "pg_serial",
LWTRANCHE_SERIAL_BUFFER, LWTRANCHE_SERIAL_SLRU,
@ -931,7 +941,7 @@ SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
else
{
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(SerialSlruCtl, targetPage, true, xid);
slotno = SimpleLruReadPage(SerialSlruCtl, targetPage, true, &xid);
}
SerialValue(slotno, xid) = minConflictCommitSeqNo;
@ -975,7 +985,7 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid)
* but will return with that lock held, which must then be released.
*/
slotno = SimpleLruReadPage_ReadOnly(SerialSlruCtl,
SerialPage(xid), xid);
SerialPage(xid), &xid);
val = SerialValue(slotno, xid);
LWLockRelease(SimpleLruGetBankLock(SerialSlruCtl, SerialPage(xid)));
return val;

View file

@ -13,6 +13,7 @@
#ifndef SLRU_H
#define SLRU_H
#include "access/transam.h"
#include "access/xlogdefs.h"
#include "storage/lwlock.h"
#include "storage/sync.h"
@ -141,6 +142,16 @@ typedef struct SlruCtlData
*/
bool (*PagePrecedes) (int64, int64);
/*
* Callback to provide more details on an I/O error. This is called as
* part of ereport(), and the callback function is expected to call
* errdetail() to provide more context on the SLRU access.
*
* The opaque_data argument here is the argument that was passed to the
* SimpleLruReadPage() function.
*/
int (*errdetail_for_io_error) (const void *opaque_data);
/*
* Dir is set during SimpleLruInit and does not change thereafter. Since
* it's always the same, it doesn't need to be in shared memory.
@ -174,9 +185,9 @@ extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
extern int SimpleLruZeroPage(SlruCtl ctl, int64 pageno);
extern void SimpleLruZeroAndWritePage(SlruCtl ctl, int64 pageno);
extern int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
TransactionId xid);
const void *opaque_data);
extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno,
TransactionId xid);
const void *opaque_data);
extern void SimpleLruWritePage(SlruCtl ctl, int slotno);
extern void SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied);
#ifdef USE_ASSERT_CHECKING

View file

@ -23,6 +23,10 @@ SELECT test_slru_page_exists(12345);
t
(1 row)
-- Test read failure
SELECT test_slru_page_read(54321, false, '123'::xid);
ERROR: could not open file "pg_test_slru/0000000000006A1": No such file or directory
DETAIL: Could not access test_slru entry 123.
-- 48 extra pages
SELECT count(test_slru_page_write(a, 'Test SLRU'))
FROM generate_series(12346, 12393, 1) as a;

View file

@ -5,6 +5,9 @@ SELECT test_slru_page_write(12345, 'Test SLRU');
SELECT test_slru_page_read(12345);
SELECT test_slru_page_exists(12345);
-- Test read failure
SELECT test_slru_page_read(54321, false, '123'::xid);
-- 48 extra pages
SELECT count(test_slru_page_write(a, 'Test SLRU'))
FROM generate_series(12346, 12393, 1) as a;

View file

@ -7,7 +7,7 @@ CREATE OR REPLACE FUNCTION test_slru_page_writeall() RETURNS VOID
AS 'MODULE_PATHNAME', 'test_slru_page_writeall' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_sync(bigint) RETURNS VOID
AS 'MODULE_PATHNAME', 'test_slru_page_sync' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_read(bigint, bool DEFAULT true) RETURNS text
CREATE OR REPLACE FUNCTION test_slru_page_read(bigint, bool DEFAULT true, xid DEFAULT '0') RETURNS text
AS 'MODULE_PATHNAME', 'test_slru_page_read' LANGUAGE C;
CREATE OR REPLACE FUNCTION test_slru_page_readonly(bigint) RETURNS text
AS 'MODULE_PATHNAME', 'test_slru_page_readonly' LANGUAGE C;

View file

@ -93,14 +93,14 @@ test_slru_page_read(PG_FUNCTION_ARGS)
{
int64 pageno = PG_GETARG_INT64(0);
bool write_ok = PG_GETARG_BOOL(1);
TransactionId xid = PG_GETARG_TRANSACTIONID(2);
char *data = NULL;
int slotno;
LWLock *lock = SimpleLruGetBankLock(TestSlruCtl, pageno);
/* find page in buffers, reading it if necessary */
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(TestSlruCtl, pageno,
write_ok, InvalidTransactionId);
slotno = SimpleLruReadPage(TestSlruCtl, pageno, write_ok, &xid);
data = (char *) TestSlruCtl->shared->page_buffer[slotno];
LWLockRelease(lock);
@ -118,7 +118,7 @@ test_slru_page_readonly(PG_FUNCTION_ARGS)
/* find page in buffers, reading it if necessary */
slotno = SimpleLruReadPage_ReadOnly(TestSlruCtl,
pageno,
InvalidTransactionId);
NULL);
Assert(LWLockHeldByMe(lock));
data = (char *) TestSlruCtl->shared->page_buffer[slotno];
LWLockRelease(lock);
@ -210,6 +210,14 @@ test_slru_page_precedes_logically(int64 page1, int64 page2)
return page1 < page2;
}
static int
test_slru_errdetail_for_io_error(const void *opaque_data)
{
TransactionId xid = *(const TransactionId *) opaque_data;
return errdetail("Could not access test_slru entry %u.", xid);
}
static void
test_slru_shmem_startup(void)
{
@ -245,6 +253,7 @@ test_slru_shmem_startup(void)
}
TestSlruCtl->PagePrecedes = test_slru_page_precedes_logically;
TestSlruCtl->errdetail_for_io_error = test_slru_errdetail_for_io_error;
SimpleLruInit(TestSlruCtl, "TestSLRU",
NUM_TEST_BUFFERS, 0, slru_dir_name,
test_buffer_tranche_id, test_tranche_id, SYNC_HANDLER_NONE,

View file

@ -1747,6 +1747,7 @@ MultiSortSupport
MultiSortSupportData
MultiXactId
MultiXactMember
MultiXactMemberSlruReadContext
MultiXactOffset
MultiXactOffset32
MultiXactStateData