postgresql/src/include/replication/logical.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

155 lines
4.5 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
* logical.h
* PostgreSQL logical decoding coordination
*
* Copyright (c) 2012-2023, PostgreSQL Global Development Group
*
*-------------------------------------------------------------------------
*/
#ifndef LOGICAL_H
#define LOGICAL_H
#include "access/xlog.h"
#include "access/xlogreader.h"
#include "replication/output_plugin.h"
#include "replication/slot.h"
struct LogicalDecodingContext;
typedef void (*LogicalOutputPluginWriterWrite) (struct LogicalDecodingContext *lr,
XLogRecPtr Ptr,
TransactionId xid,
bool last_write
);
typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
XLogRecPtr Ptr,
TransactionId xid,
bool skipped_xact
);
typedef struct LogicalDecodingContext
{
/* memory context this is all allocated in */
MemoryContext context;
/* The associated replication slot */
ReplicationSlot *slot;
/* infrastructure pieces for decoding */
XLogReaderState *reader;
struct ReorderBuffer *reorder;
struct SnapBuild *snapshot_builder;
/*
* Marks the logical decoding context as fast forward decoding one. Such a
* context does not have plugin loaded so most of the following properties
* are unused.
*/
bool fast_forward;
OutputPluginCallbacks callbacks;
OutputPluginOptions options;
/*
* User specified options
*/
List *output_plugin_options;
/*
* User-Provided callback for writing/streaming out data.
*/
LogicalOutputPluginWriterPrepareWrite prepare_write;
LogicalOutputPluginWriterWrite write;
LogicalOutputPluginWriterUpdateProgress update_progress;
/*
* Output buffer.
*/
StringInfo out;
/*
* Private data pointer of the output plugin.
*/
void *output_plugin_private;
/*
* Private data pointer for the data writer.
*/
void *output_writer_private;
/*
* Does the output plugin support streaming, and is it enabled?
*/
bool streaming;
/*
* Does the output plugin support two-phase decoding, and is it enabled?
*/
bool twophase;
Add support for prepared transactions to built-in logical replication. To add support for streaming transactions at prepare time into the built-in logical replication, we need to do the following things: * Modify the output plugin (pgoutput) to implement the new two-phase API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle two-phase transactions by replaying them on prepare. * Add a new SUBSCRIPTION option "two_phase" to allow users to enable two-phase transactions. We enable the two_phase once the initial data sync is over. We however must explicitly disable replication of two-phase transactions during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover, we don't have a replication connection open so we don't know where to send the data anyway. The streaming option is not allowed with this new two_phase option. This can be done as a separate patch. We don't allow to toggle two_phase option of a subscription because it can lead to an inconsistent replica. For the same reason, we don't allow to refresh the publication once the two_phase is enabled for a subscription unless copy_data option is false. Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow Tested-By: Haiying Tang Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
2021-07-13 22:03:50 -04:00
/*
* Is two-phase option given by output plugin?
*
* This flag indicates that the plugin passed in the two-phase option as
* part of the START_STREAMING command. We can't rely solely on the
* twophase flag which only tells whether the plugin provided all the
* necessary two-phase callbacks.
*/
bool twophase_opt_given;
/*
* State for writing output.
*/
bool accept_writes;
bool prepared_write;
XLogRecPtr write_location;
TransactionId write_xid;
/* Are we processing the end LSN of a transaction? */
bool end_xact;
Fix possibility of logical decoding partial transaction changes. When creating and initializing a logical slot, the restart_lsn is set to the latest WAL insertion point (or the latest replay point on standbys). Subsequently, WAL records are decoded from that point to find the start point for extracting changes in the DecodingContextFindStartpoint() function. Since the initial restart_lsn could be in the middle of a transaction, the start point must be a consistent point where we won't see the data for partial transactions. Previously, when not building a full snapshot, serialized snapshots were restored, and the SnapBuild jumps to the consistent state even while finding the start point. Consequently, the slot's restart_lsn and confirmed_flush could be set to the middle of a transaction. This could lead to various unexpected consequences. Specifically, there were reports of logical decoding decoding partial transactions, and assertion failures occurred because only subtransactions were decoded without decoding their top-level transaction until decoding the commit record. To resolve this issue, the changes prevent restoring the serialized snapshot and jumping to the consistent state while finding the start point. On v17 and HEAD, a flag indicating whether snapshot restores should be skipped has been added to the SnapBuild struct, and SNAPBUILD_VERSION has been bumpded. On backbranches, the flag is stored in the LogicalDecodingContext instead, preserving on-disk compatibility. Backpatch to all supported versions. Reported-by: Drew Callahan Reviewed-by: Amit Kapila, Hayato Kuroda Discussion: https://postgr.es/m/2444AA15-D21B-4CCE-8052-52C7C2DAFE5C%40amazon.com Backpatch-through: 12
2024-07-11 09:48:18 -04:00
/*
* True if the logical decoding context being used for the creation
* of a logical replication slot.
*/
bool in_create;
} LogicalDecodingContext;
extern void CheckLogicalDecodingRequirements(void);
2020-08-08 01:31:52 -04:00
extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
extern void LogicalIncreaseXminForSlot(XLogRecPtr current_lsn,
TransactionId xmin);
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
TransactionId xid, const char *gid);
Introduce replication progress tracking infrastructure. When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
2015-04-29 13:30:53 -04:00
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
Implement streaming mode in ReorderBuffer. Instead of serializing the transaction to disk after reaching the logical_decoding_work_mem limit in memory, we consume the changes we have in memory and invoke stream API methods added by commit 45fdc9738b. However, sometimes if we have incomplete toast or speculative insert we spill to the disk because we can't generate the complete tuple and stream. And, as soon as we get the complete tuple we stream the transaction including the serialized changes. We can do this incremental processing thanks to having assignments (associating subxact with toplevel xacts) in WAL right away, and thanks to logging the invalidation messages at each command end. These features are added by commits 0bead9af48 and c55040ccd0 respectively. Now that we can stream in-progress transactions, the concurrent aborts may cause failures when the output plugin consults catalogs (both system and user-defined). We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK sqlerrcode from system table scan APIs to the backend or WALSender decoding a specific uncommitted transaction. The decoding logic on the receipt of such a sqlerrcode aborts the decoding of the current transaction and continue with the decoding of other transactions. We have ReorderBufferTXN pointer in each ReorderBufferChange by which we know which xact it belongs to. The output plugin can use this to decide which changes to discard in case of stream_abort_cb (e.g. when a subxact gets discarded). We also provide a new option via SQL APIs to fetch the changes being streamed. Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
2020-08-07 22:04:39 -04:00
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
Introduce replication progress tracking infrastructure. When implementing a replication solution ontop of logical decoding, two related problems exist: * How to safely keep track of replication progress * How to change replication behavior, based on the origin of a row; e.g. to avoid loops in bi-directional replication setups The solution to these problems, as implemented here, consist out of three parts: 1) 'replication origins', which identify nodes in a replication setup. 2) 'replication progress tracking', which remembers, for each replication origin, how far replay has progressed in a efficient and crash safe manner. 3) The ability to filter out changes performed on the behest of a replication origin during logical decoding; this allows complex replication topologies. E.g. by filtering all replayed changes out. Most of this could also be implemented in "userspace", e.g. by inserting additional rows contain origin information, but that ends up being much less efficient and more complicated. We don't want to require various replication solutions to reimplement logic for this independently. The infrastructure is intended to be generic enough to be reusable. This infrastructure also replaces the 'nodeid' infrastructure of commit timestamps. It is intended to provide all the former capabilities, except that there's only 2^16 different origins; but now they integrate with logical decoding. Additionally more functionality is accessible via SQL. Since the commit timestamp infrastructure has also been introduced in 9.5 (commit 73c986add) changing the API is not a problem. For now the number of origins for which the replication progress can be tracked simultaneously is determined by the max_replication_slots GUC. That GUC is not a perfect match to configure this, but there doesn't seem to be sufficient reason to introduce a separate new one. Bumps both catversion and wal page magic. Author: Andres Freund, with contributions from Petr Jelinek and Craig Ringer Reviewed-By: Heikki Linnakangas, Petr Jelinek, Robert Haas, Steve Singer Discussion: 20150216002155.GI15326@awork2.anarazel.de, 20140923182422.GA15776@alap3.anarazel.de, 20131114172632.GE7522@alap2.anarazel.de
2015-04-29 13:30:53 -04:00
#endif