diff --git a/src/bin/pg_waldump/archive_waldump.c b/src/bin/pg_waldump/archive_waldump.c index b078c2d6960..067cb85c36b 100644 --- a/src/bin/pg_waldump/archive_waldump.c +++ b/src/bin/pg_waldump/archive_waldump.c @@ -89,7 +89,7 @@ typedef struct astreamer_waldump static ArchivedWALFile *get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo); -static int read_archive_file(XLogDumpPrivate *privateInfo, Size count); +static bool read_archive_file(XLogDumpPrivate *privateInfo, Size count); static void setup_tmpwal_dir(const char *waldir); static void cleanup_tmpwal_dir_atexit(void); @@ -128,8 +128,7 @@ init_archive_reader(XLogDumpPrivate *privateInfo, astreamer *streamer; ArchivedWALFile *entry = NULL; XLogLongPageHeader longhdr; - XLogSegNo segno; - TimeLineID timeline; + ArchivedWAL_iterator iter; /* Open tar archive and store its file descriptor */ fd = open_file_in_directory(privateInfo->archive_dir, @@ -139,6 +138,7 @@ init_archive_reader(XLogDumpPrivate *privateInfo, pg_fatal("could not open file \"%s\"", privateInfo->archive_name); privateInfo->archive_fd = fd; + privateInfo->archive_fd_eof = false; streamer = astreamer_waldump_new(privateInfo); @@ -172,17 +172,27 @@ init_archive_reader(XLogDumpPrivate *privateInfo, privateInfo->archive_wal_htab = ArchivedWAL_create(8, NULL); /* - * Read until we have at least one full WAL page (XLOG_BLCKSZ bytes) from - * the first WAL segment in the archive so we can extract the WAL segment - * size from the long page header. + * Read until we have at least one WAL segment with enough data to extract + * the WAL segment size from the long page header. + * + * We must not rely on cur_file here, because it can become NULL if a + * member trailer is processed during a read_archive_file() call. Instead, + * scan the hash table after each read to find any entry with sufficient + * data. */ - while (entry == NULL || entry->buf->len < XLOG_BLCKSZ) + while (entry == NULL) { - if (read_archive_file(privateInfo, XLOG_BLCKSZ) == 0) + if (!read_archive_file(privateInfo, XLOG_BLCKSZ)) pg_fatal("could not find WAL in archive \"%s\"", privateInfo->archive_name); - entry = privateInfo->cur_file; + ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter); + while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab, + &iter)) != NULL) + { + if (entry->read_len >= sizeof(XLogLongPageHeaderData)) + break; + } } /* Extract the WAL segment size from the long page header */ @@ -213,18 +223,25 @@ init_archive_reader(XLogDumpPrivate *privateInfo, privateInfo->segsize); /* - * This WAL record was fetched before the filtering parameters - * (start_segno and end_segno) were fully initialized. Perform the - * relevance check against the user-provided range now; if the WAL falls - * outside this range, remove it from the hash table. Subsequent WAL will - * be filtered automatically by the archive streamer using the updated - * start_segno and end_segno values. + * Now that we have initialized the filtering parameters (start_segno and + * end_segno), we can discard any already-loaded WAL hash table entries + * for segments we don't actually need. Subsequent WAL will be filtered + * automatically by the archive streamer using the updated start_segno and + * end_segno values. */ - XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize); - if (privateInfo->timeline != timeline || - privateInfo->start_segno > segno || - privateInfo->end_segno < segno) - free_archive_wal_entry(entry->fname, privateInfo); + ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter); + while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab, + &iter)) != NULL) + { + XLogSegNo segno; + TimeLineID timeline; + + XLogFromFileName(entry->fname, &timeline, &segno, privateInfo->segsize); + if (privateInfo->timeline != timeline || + privateInfo->start_segno > segno || + privateInfo->end_segno < segno) + free_archive_wal_entry(entry->fname, privateInfo); + } } /* @@ -236,9 +253,10 @@ free_archive_reader(XLogDumpPrivate *privateInfo) /* * NB: Normally, astreamer_finalize() is called before astreamer_free() to * flush any remaining buffered data or to ensure the end of the tar - * archive is reached. However, when decoding WAL, once we hit the end - * LSN, any remaining buffered data or unread portion of the archive can - * be safely ignored. + * archive is reached. read_archive_file() may have done so. However, + * when decoding WAL we can stop once we hit the end LSN, so we may never + * have read all of the input file. In that case any remaining buffered + * data or unread portion of the archive can be safely ignored. */ astreamer_free(privateInfo->archive_streamer); @@ -384,7 +402,7 @@ read_archive_wal_page(XLogDumpPrivate *privateInfo, XLogRecPtr targetPagePtr, fname, privateInfo->archive_name, (long long int) (count - nbytes), (long long int) count); - if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0) + if (!read_archive_file(privateInfo, READ_CHUNK_SIZE)) pg_fatal("unexpected end of archive \"%s\" while reading \"%s\": read %lld of %lld bytes", privateInfo->archive_name, fname, (long long int) (count - nbytes), @@ -446,30 +464,25 @@ free_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo) /* * Returns the archived WAL entry from the hash table if it already exists. * Otherwise, reads more data from the archive until the requested entry is - * found. If the archive streamer is reading a WAL file from the archive that + * found. If the archive streamer reads a WAL file from the archive that * is not currently needed, that data is spilled to a temporary file for later * retrieval. + * + * Note that the returned entry might not have been completely read from + * the archive yet. */ static ArchivedWALFile * get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo) { - ArchivedWALFile *entry = NULL; - FILE *write_fp = NULL; - - /* - * Search the hash table first. If the entry is found, return it. - * Otherwise, the requested WAL entry hasn't been read from the archive - * yet; invoke the archive streamer to fetch it. - */ while (1) { + ArchivedWALFile *entry; + ArchivedWAL_iterator iter; + /* - * Search hash table. - * - * We perform the search inside the loop because a single iteration of - * the archive reader may decompress and extract multiple files into - * the hash table. One of these newly added files could be the one we - * are seeking. + * Search the hash table first. If the entry is found, return it. + * Otherwise, the requested WAL entry hasn't been read from the + * archive yet; we must invoke the archive streamer to fetch it. */ entry = ArchivedWAL_lookup(privateInfo->archive_wal_htab, fname); @@ -477,71 +490,62 @@ get_archive_wal_entry(const char *fname, XLogDumpPrivate *privateInfo) return entry; /* - * Capture the current entry before calling read_archive_file(), - * because cur_file may advance to a new segment during streaming. We - * hold this reference so we can flush any remaining buffer data and - * close the write handle once we detect that cur_file has moved on. + * Before loading more data, scan the hash table to see if we have + * loaded any files we don't need yet. If so, spill their data to + * disk to conserve memory space. But don't try to spill a + * partially-read file; it's not worth the complication. */ - entry = privateInfo->cur_file; - - /* - * Fetch more data either when no current file is being tracked or - * when its buffer has been fully flushed to the temporary file. - */ - if (entry == NULL || entry->buf->len == 0) + ArchivedWAL_start_iterate(privateInfo->archive_wal_htab, &iter); + while ((entry = ArchivedWAL_iterate(privateInfo->archive_wal_htab, + &iter)) != NULL) { - if (read_archive_file(privateInfo, READ_CHUNK_SIZE) == 0) - break; /* archive file ended */ - } + FILE *write_fp; - /* - * Archive streamer is reading a non-WAL file or an irrelevant WAL - * file. - */ - if (entry == NULL) - continue; + /* OK to spill? */ + if (entry->spilled) + continue; /* already spilled */ + if (entry == privateInfo->cur_file) + continue; /* still being read */ - /* - * The streamer is producing a WAL segment that isn't the one asked - * for; it must be arriving out of order. Spill its data to disk so - * it can be read back when needed. - */ - Assert(strcmp(fname, entry->fname) != 0); - - /* Create a temporary file if one does not already exist */ - if (!entry->spilled) - { + /* Write out the completed WAL file contents to a temp file. */ write_fp = prepare_tmp_write(entry->fname, privateInfo); + perform_tmp_write(entry->fname, entry->buf, write_fp); + fclose(write_fp); + + /* resetStringInfo won't release storage, so delete/recreate. */ + destroyStringInfo(entry->buf); + entry->buf = makeStringInfo(); entry->spilled = true; } - /* Flush data from the buffer to the file */ - perform_tmp_write(entry->fname, entry->buf, write_fp); - resetStringInfo(entry->buf); - /* - * If cur_file changed since we captured entry above, the archive - * streamer has finished this segment and moved on. Close its spill - * file handle so data is flushed to disk before the next segment - * starts writing to a different handle. + * Read more data. If we reach EOF, the desired file is not present. */ - if (entry != privateInfo->cur_file && write_fp != NULL) - { - fclose(write_fp); - write_fp = NULL; - } + if (!read_archive_file(privateInfo, READ_CHUNK_SIZE)) + pg_fatal("could not find WAL \"%s\" in archive \"%s\"", + fname, privateInfo->archive_name); } - - /* Requested WAL segment not found */ - pg_fatal("could not find WAL \"%s\" in archive \"%s\"", - fname, privateInfo->archive_name); } /* * Reads a chunk from the archive file and passes it through the streamer * pipeline for decompression (if needed) and tar member extraction. + * + * count is the maximum amount to try to read this time. Note that it's + * measured in raw file bytes, and may have little to do with how much + * comes out of decompression/extraction. + * + * Returns true if successful, false if there is no more data. + * + * Callers must be aware that a single call may trigger multiple callbacks + * in astreamer_waldump_content, so privateInfo->cur_file can change value + * (or become NULL) during a call. In particular, cur_file is set to NULL + * when the ASTREAMER_MEMBER_TRAILER callback fires at the end of a tar + * member; it is then set to a new entry when the next WAL member's + * ASTREAMER_MEMBER_HEADER callback fires, which may or may not happen + * within the same call. */ -static int +static bool read_archive_file(XLogDumpPrivate *privateInfo, Size count) { int rc; @@ -549,6 +553,11 @@ read_archive_file(XLogDumpPrivate *privateInfo, Size count) /* The read request must not exceed the allocated buffer size. */ Assert(privateInfo->archive_read_buf_size >= count); + /* Fail if we already reached EOF in a prior call. */ + if (privateInfo->archive_fd_eof) + return false; + + /* Try to read some more data. */ rc = read(privateInfo->archive_fd, privateInfo->archive_read_buf, count); if (rc < 0) pg_fatal("could not read file \"%s\": %m", @@ -562,8 +571,19 @@ read_archive_file(XLogDumpPrivate *privateInfo, Size count) astreamer_content(privateInfo->archive_streamer, NULL, privateInfo->archive_read_buf, rc, ASTREAMER_UNKNOWN); + else + { + /* + * We reached EOF, but there is probably still data queued in the + * astreamer pipeline's buffers. Flush it out to ensure that we + * process everything. + */ + astreamer_finalize(privateInfo->archive_streamer); + /* Set flag to ensure we don't finalize more than once. */ + privateInfo->archive_fd_eof = true; + } - return rc; + return true; } /* diff --git a/src/bin/pg_waldump/pg_waldump.h b/src/bin/pg_waldump/pg_waldump.h index 36893624f53..ca0dfd97168 100644 --- a/src/bin/pg_waldump/pg_waldump.h +++ b/src/bin/pg_waldump/pg_waldump.h @@ -35,6 +35,7 @@ typedef struct XLogDumpPrivate char *archive_dir; char *archive_name; /* Tar archive filename */ int archive_fd; /* File descriptor for the open tar file */ + bool archive_fd_eof; /* Have we reached EOF on archive_fd? */ astreamer *archive_streamer; char *archive_read_buf; /* Reusable read buffer for archive I/O */ @@ -43,7 +44,14 @@ typedef struct XLogDumpPrivate Size archive_read_buf_size; #endif - /* What the archive streamer is currently reading */ + /* + * The buffer for the WAL file the archive streamer is currently reading, + * or NULL if none. It is quite risky to examine this anywhere except in + * astreamer_waldump_content(), since it can change multiple times during + * a single read_archive_file() call. However, it is safe to assume that + * if cur_file is different from a particular ArchivedWALFile of interest, + * then the archive streamer has finished reading that file. + */ struct ArchivedWALFile *cur_file; /*