mirror of
https://github.com/postgres/postgres.git
synced 2026-03-25 03:45:15 -04:00
Fix assorted bugs in archive_waldump.c.
1. archive_waldump.c called astreamer_finalize() nowhere. This meant that any data retained in decompression buffers at the moment we detect archive EOF would never reach astreamer_waldump_content(), resulting in surprising failures if we actually need the last few bytes of the archive file. To fix that, make read_archive_file() do the finalize once it detects EOF. Change its API to return a boolean "yes there's more data" rather than the entirely-misleading raw count of bytes read. 2. init_archive_reader() relied on privateInfo->cur_file to track which WAL segment was being read, but cur_file can become NULL if a member trailer is processed during a read_archive_file() call. This could cause unreproducible "could not find WAL in archive" failures, particularly with compressed archives where all the WAL data fits in a small number of compressed bytes. Fix by scanning the hash table after each read to find any cached WAL segment with sufficient data, instead of depending on cur_file. Also reduce the minimum data requirement from XLOG_BLCKSZ to sizeof(XLogLongPageHeaderData), since we only need the long page header to extract the segment size. We likewise need to fix init_archive_reader() to scan the whole hash table for irrelevant entries, since we might have already loaded more than one entry when the data is compressible enough. 3. get_archive_wal_entry() relied on tracking cur_file to identify WAL hash table entries that need to be spilled to disk. However, this can't work for entries that are read completely within a single read_archive_file call: the caller will never see cur_file pointing at such an entry. Instead, scan the WAL hash table to find entries we should spill. This also fixes a buglet that any hash table entries completely loaded during init_archive_reader were never considered for spilling. Also, simplify the logic tremendously by not attempting to spill entries that haven't been read fully. I am not convinced that the old logic handled that correctly in every path, and it's really not worth the complication and risk of bugs to try to spill entries on the fly. We can just write them in a single go once they are no longer the cur_file. 4. Fix a rather critical performance problem: the code thought that resetStringInfo() will reclaim storage, but it doesn't. So by the end of the run we'd have consumed storage space equal to the total amount of WAL read, negating all the effort of the spill logic. Also document the contract that cur_file can change (or become NULL) during a single read_archive_file() call, since the decompression pipeline may produce enough output to trigger multiple astreamer callbacks. Author: Tom Lane <tgl@sss.pgh.pa.us> Co-authored-by: Andrew Dunstan <andrew@dunslane.net> Discussion: https://postgr.es/m/2178517.1774064942@sss.pgh.pa.us
This commit is contained in:
parent
5868372bbf
commit
860359ea02
2 changed files with 117 additions and 89 deletions
|
|
@ -89,7 +89,7 @@ typedef struct astreamer_waldump
|
|||
|
||||
static ArchivedWALFile *get_archive_wal_entry(const char *fname,
|
||||
XLogDumpPrivate *privateInfo);
|
||||
static int read_archive_file(XLogDumpPrivate *privateInfo, Size count);
|
||||
static bool read_archive_file(XLogDumpPrivate *privateInfo, Size count);
|
||||
static void setup_tmpwal_dir(const char *waldir);
|
||||
static void cleanup_tmpwal_dir_atexit(void);
|
||||
|
||||
|
|
@ -128,8 +128,7 @@ init_archive_reader(XLogDumpPrivate *privateInfo,
|
|||
astreamer *streamer;
|
||||
ArchivedWALFile *entry = NULL;
|
||||
XLogLongPageHeader longhdr;
|
||||
XLogSegNo segno;
|
||||
TimeLineID timeline;
|
||||
ArchivedWAL_iterator iter;
|
||||
|
||||
/* Open tar archive and store its file descriptor */
|
||||
fd = open_file_in_directory(privateInfo->archive_dir,
|
||||
|
|
@ -139,6 +138,7 @@ init_archive_reader(XLogDumpPrivate *privateInfo,
|
|||
pg_fatal("could not open file \"%s\"", privateInfo->archive_name);
|
||||
|
||||
privateInfo->archive_fd = fd;
|
||||
privateInfo->archive_fd_eof = false;
|
||||
|
||||
streamer = astreamer_waldump_new(privateInfo);
|
||||
|
||||
|
|
@ -172,17 +172,27 @@ init_archive_reader(XLogDumpPrivate *privateInfo,
|
|||
privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL);
|
||||
|
||||
/*
|
||||
* Read until we have at least one full WAL page (XLOG_BLCKSZ bytes) from
|
||||
* the first WAL segment in the archive so we can extract the WAL segment
|
||||
* size from the long page header.
|
||||
* Read until we have at least one WAL segment with enough data to extract
|
||||
* the WAL segment size from the long page header.
|
||||
*
|
||||
* We must not rely on cur_file here, because it can become NULL if a
|
||||
* member trailer is processed during a read_archive_file() call. Instead,
|
||||
* scan the hash table after each read to find any entry with sufficient
|
||||
* data.
|
||||
*/
|
||||
while (entry == NULL || entry->buf->len < XLOG_BLCKSZ)
|
||||
while (entry == NULL)
|
||||
{
|
||||
if (read_archive_file(privateInfo, XLOG_BLCKSZ) == 0)
|
||||
if (!read_archive_file(privateInfo, XLOG_BLCKSZ))
|
||||
pg_fatal("could not find WAL in archive \"%s\"",
|
||||
privateInfo->archive_name);
|
||||
|
||||
entry = privateInfo->cur_file;
|
||||
ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
|
||||
while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
|
||||
&iter)) != NULL)
|
||||
{
|
||||
if (entry->read_len >= sizeof(XLogLongPageHeaderData))
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/* Extract the WAL segment size from the long page header */
|
||||
|
|
@ -213,18 +223,25 @@ init_archive_reader(XLogDumpPrivate *privateInfo,
|
|||
privateInfo->segsize);
|
||||
|
||||
/*
|
||||
* This WAL record was fetched before the filtering parameters
|
||||
* (start_segno and end_segno) were fully initialized. Perform the
|
||||
* relevance check against the user-provided range now; if the WAL falls
|
||||
* outside this range, remove it from the hash table. Subsequent WAL will
|
||||
* be filtered automatically by the archive streamer using the updated
|
||||
* start_segno and end_segno values.
|
||||
* Now that we have initialized the filtering parameters (start_segno and
|
||||
* end_segno), we can discard any already-loaded WAL hash table entries
|
||||
* for segments we don't actually need. Subsequent WAL will be filtered
|
||||
* automatically by the archive streamer using the updated start_segno and
|
||||
* end_segno values.
|
||||
*/
|
||||
XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
|
||||
if (privateInfo->timeline != timeline ||
|
||||
privateInfo->start_segno > segno ||
|
||||
privateInfo->end_segno < segno)
|
||||
free_archive_wal_entry(entry->fname, privateInfo);
|
||||
ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
|
||||
while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
|
||||
&iter)) != NULL)
|
||||
{
|
||||
XLogSegNo segno;
|
||||
TimeLineID timeline;
|
||||
|
||||
XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize);
|
||||
if (privateInfo->timeline != timeline ||
|
||||
privateInfo->start_segno > segno ||
|
||||
privateInfo->end_segno < segno)
|
||||
free_archive_wal_entry(entry->fname, privateInfo);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -236,9 +253,10 @@ free_archive_reader(XLogDumpPrivate *privateInfo)
|
|||
/*
|
||||
* NB: Normally, astreamer_finalize() is called before astreamer_free() to
|
||||
* flush any remaining buffered data or to ensure the end of the tar
|
||||
* archive is reached. However, when decoding WAL, once we hit the end
|
||||
* LSN, any remaining buffered data or unread portion of the archive can
|
||||
* be safely ignored.
|
||||
* archive is reached. read_archive_file() may have done so. However,
|
||||
* when decoding WAL we can stop once we hit the end LSN, so we may never
|
||||
* have read all of the input file. In that case any remaining buffered
|
||||
* data or unread portion of the archive can be safely ignored.
|
||||
*/
|
||||
astreamer_free(privateInfo->archive_streamer);
|
||||
|
||||
|
|
@ -384,7 +402,7 @@ read_archive_wal_page(XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr,
|
|||
fname, privateInfo->archive_name,
|
||||
(long long int) (count - nbytes),
|
||||
(long long int) count);
|
||||
if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
|
||||
if (!read_archive_file(privateInfo, READ_CHUNK_SIZE))
|
||||
pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %lld of %lld bytes",
|
||||
privateInfo->archive_name, fname,
|
||||
(long long int) (count - nbytes),
|
||||
|
|
@ -446,30 +464,25 @@ free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
|
|||
/*
|
||||
* Returns the archived WAL entry from the hash table if it already exists.
|
||||
* Otherwise, reads more data from the archive until the requested entry is
|
||||
* found. If the archive streamer is reading a WAL file from the archive that
|
||||
* found. If the archive streamer reads a WAL file from the archive that
|
||||
* is not currently needed, that data is spilled to a temporary file for later
|
||||
* retrieval.
|
||||
*
|
||||
* Note that the returned entry might not have been completely read from
|
||||
* the archive yet.
|
||||
*/
|
||||
static ArchivedWALFile *
|
||||
get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
|
||||
{
|
||||
ArchivedWALFile *entry = NULL;
|
||||
FILE *write_fp = NULL;
|
||||
|
||||
/*
|
||||
* Search the hash table first. If the entry is found, return it.
|
||||
* Otherwise, the requested WAL entry hasn't been read from the archive
|
||||
* yet; invoke the archive streamer to fetch it.
|
||||
*/
|
||||
while (1)
|
||||
{
|
||||
ArchivedWALFile *entry;
|
||||
ArchivedWAL_iterator iter;
|
||||
|
||||
/*
|
||||
* Search hash table.
|
||||
*
|
||||
* We perform the search inside the loop because a single iteration of
|
||||
* the archive reader may decompress and extract multiple files into
|
||||
* the hash table. One of these newly added files could be the one we
|
||||
* are seeking.
|
||||
* Search the hash table first. If the entry is found, return it.
|
||||
* Otherwise, the requested WAL entry hasn't been read from the
|
||||
* archive yet; we must invoke the archive streamer to fetch it.
|
||||
*/
|
||||
entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname);
|
||||
|
||||
|
|
@ -477,71 +490,62 @@ get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo)
|
|||
return entry;
|
||||
|
||||
/*
|
||||
* Capture the current entry before calling read_archive_file(),
|
||||
* because cur_file may advance to a new segment during streaming. We
|
||||
* hold this reference so we can flush any remaining buffer data and
|
||||
* close the write handle once we detect that cur_file has moved on.
|
||||
* Before loading more data, scan the hash table to see if we have
|
||||
* loaded any files we don't need yet. If so, spill their data to
|
||||
* disk to conserve memory space. But don't try to spill a
|
||||
* partially-read file; it's not worth the complication.
|
||||
*/
|
||||
entry = privateInfo->cur_file;
|
||||
|
||||
/*
|
||||
* Fetch more data either when no current file is being tracked or
|
||||
* when its buffer has been fully flushed to the temporary file.
|
||||
*/
|
||||
if (entry == NULL || entry->buf->len == 0)
|
||||
ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter);
|
||||
while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab,
|
||||
&iter)) != NULL)
|
||||
{
|
||||
if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0)
|
||||
break; /* archive file ended */
|
||||
}
|
||||
FILE *write_fp;
|
||||
|
||||
/*
|
||||
* Archive streamer is reading a non-WAL file or an irrelevant WAL
|
||||
* file.
|
||||
*/
|
||||
if (entry == NULL)
|
||||
continue;
|
||||
/* OK to spill? */
|
||||
if (entry->spilled)
|
||||
continue; /* already spilled */
|
||||
if (entry == privateInfo->cur_file)
|
||||
continue; /* still being read */
|
||||
|
||||
/*
|
||||
* The streamer is producing a WAL segment that isn't the one asked
|
||||
* for; it must be arriving out of order. Spill its data to disk so
|
||||
* it can be read back when needed.
|
||||
*/
|
||||
Assert(strcmp(fname, entry->fname) != 0);
|
||||
|
||||
/* Create a temporary file if one does not already exist */
|
||||
if (!entry->spilled)
|
||||
{
|
||||
/* Write out the completed WAL file contents to a temp file. */
|
||||
write_fp = prepare_tmp_write(entry->fname, privateInfo);
|
||||
perform_tmp_write(entry->fname, entry->buf, write_fp);
|
||||
fclose(write_fp);
|
||||
|
||||
/* resetStringInfo won't release storage, so delete/recreate. */
|
||||
destroyStringInfo(entry->buf);
|
||||
entry->buf = makeStringInfo();
|
||||
entry->spilled = true;
|
||||
}
|
||||
|
||||
/* Flush data from the buffer to the file */
|
||||
perform_tmp_write(entry->fname, entry->buf, write_fp);
|
||||
resetStringInfo(entry->buf);
|
||||
|
||||
/*
|
||||
* If cur_file changed since we captured entry above, the archive
|
||||
* streamer has finished this segment and moved on. Close its spill
|
||||
* file handle so data is flushed to disk before the next segment
|
||||
* starts writing to a different handle.
|
||||
* Read more data. If we reach EOF, the desired file is not present.
|
||||
*/
|
||||
if (entry != privateInfo->cur_file && write_fp != NULL)
|
||||
{
|
||||
fclose(write_fp);
|
||||
write_fp = NULL;
|
||||
}
|
||||
if (!read_archive_file(privateInfo, READ_CHUNK_SIZE))
|
||||
pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
|
||||
fname, privateInfo->archive_name);
|
||||
}
|
||||
|
||||
/* Requested WAL segment not found */
|
||||
pg_fatal("could not find WAL \"%s\" in archive \"%s\"",
|
||||
fname, privateInfo->archive_name);
|
||||
}
|
||||
|
||||
/*
|
||||
* Reads a chunk from the archive file and passes it through the streamer
|
||||
* pipeline for decompression (if needed) and tar member extraction.
|
||||
*
|
||||
* count is the maximum amount to try to read this time. Note that it's
|
||||
* measured in raw file bytes, and may have little to do with how much
|
||||
* comes out of decompression/extraction.
|
||||
*
|
||||
* Returns true if successful, false if there is no more data.
|
||||
*
|
||||
* Callers must be aware that a single call may trigger multiple callbacks
|
||||
* in astreamer_waldump_content, so privateInfo->cur_file can change value
|
||||
* (or become NULL) during a call. In particular, cur_file is set to NULL
|
||||
* when the ASTREAMER_MEMBER_TRAILER callback fires at the end of a tar
|
||||
* member; it is then set to a new entry when the next WAL member's
|
||||
* ASTREAMER_MEMBER_HEADER callback fires, which may or may not happen
|
||||
* within the same call.
|
||||
*/
|
||||
static int
|
||||
static bool
|
||||
read_archive_file(XLogDumpPrivate *privateInfo, Size count)
|
||||
{
|
||||
int rc;
|
||||
|
|
@ -549,6 +553,11 @@ read_archive_file(XLogDumpPrivate *privateInfo, Size count)
|
|||
/* The read request must not exceed the allocated buffer size. */
|
||||
Assert(privateInfo->archive_read_buf_size >= count);
|
||||
|
||||
/* Fail if we already reached EOF in a prior call. */
|
||||
if (privateInfo->archive_fd_eof)
|
||||
return false;
|
||||
|
||||
/* Try to read some more data. */
|
||||
rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf, count);
|
||||
if (rc < 0)
|
||||
pg_fatal("could not read file \"%s\": %m",
|
||||
|
|
@ -562,8 +571,19 @@ read_archive_file(XLogDumpPrivate *privateInfo, Size count)
|
|||
astreamer_content(privateInfo->archive_streamer, NULL,
|
||||
privateInfo->archive_read_buf, rc,
|
||||
ASTREAMER_UNKNOWN);
|
||||
else
|
||||
{
|
||||
/*
|
||||
* We reached EOF, but there is probably still data queued in the
|
||||
* astreamer pipeline's buffers. Flush it out to ensure that we
|
||||
* process everything.
|
||||
*/
|
||||
astreamer_finalize(privateInfo->archive_streamer);
|
||||
/* Set flag to ensure we don't finalize more than once. */
|
||||
privateInfo->archive_fd_eof = true;
|
||||
}
|
||||
|
||||
return rc;
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ typedef struct XLogDumpPrivate
|
|||
char *archive_dir;
|
||||
char *archive_name; /* Tar archive filename */
|
||||
int archive_fd; /* File descriptor for the open tar file */
|
||||
bool archive_fd_eof; /* Have we reached EOF on archive_fd? */
|
||||
|
||||
astreamer *archive_streamer;
|
||||
char *archive_read_buf; /* Reusable read buffer for archive I/O */
|
||||
|
|
@ -43,7 +44,14 @@ typedef struct XLogDumpPrivate
|
|||
Size archive_read_buf_size;
|
||||
#endif
|
||||
|
||||
/* What the archive streamer is currently reading */
|
||||
/*
|
||||
* The buffer for the WAL file the archive streamer is currently reading,
|
||||
* or NULL if none. It is quite risky to examine this anywhere except in
|
||||
* astreamer_waldump_content(), since it can change multiple times during
|
||||
* a single read_archive_file() call. However, it is safe to assume that
|
||||
* if cur_file is different from a particular ArchivedWALFile of interest,
|
||||
* then the archive streamer has finished reading that file.
|
||||
*/
|
||||
struct ArchivedWALFile *cur_file;
|
||||
|
||||
/*
|
||||
|
|
|
|||
Loading…
Reference in a new issue