mirror of
https://github.com/postgres/postgres.git
synced 2026-03-13 06:06:25 -04:00
To add support for streaming transactions at prepare time into the built-in logical replication, we need to do the following things: * Modify the output plugin (pgoutput) to implement the new two-phase API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle two-phase transactions by replaying them on prepare. * Add a new SUBSCRIPTION option "two_phase" to allow users to enable two-phase transactions. We enable the two_phase once the initial data sync is over. We however must explicitly disable replication of two-phase transactions during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover, we don't have a replication connection open so we don't know where to send the data anyway. The streaming option is not allowed with this new two_phase option. This can be done as a separate patch. We don't allow to toggle two_phase option of a subscription because it can lead to an inconsistent replica. For the same reason, we don't allow to refresh the publication once the two_phase is enabled for a subscription unless copy_data option is false. Author: Peter Smith, Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Sawada Masahiko, Vignesh C, Dilip Kumar, Takamichi Osumi, Greg Nancarrow Tested-By: Haiying Tang Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru Discussion: https://postgr.es/m/CAA4eK1+opiV4aFTmWWUF9h_32=HfPOW9vZASHarT0UA5oBrtGw@mail.gmail.com
144 lines
4.2 KiB
C
144 lines
4.2 KiB
C
/*-------------------------------------------------------------------------
|
|
* logical.h
|
|
* PostgreSQL logical decoding coordination
|
|
*
|
|
* Copyright (c) 2012-2021, PostgreSQL Global Development Group
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#ifndef LOGICAL_H
|
|
#define LOGICAL_H
|
|
|
|
#include "access/xlog.h"
|
|
#include "access/xlogreader.h"
|
|
#include "replication/output_plugin.h"
|
|
#include "replication/slot.h"
|
|
|
|
struct LogicalDecodingContext;
|
|
|
|
typedef void (*LogicalOutputPluginWriterWrite) (struct LogicalDecodingContext *lr,
|
|
XLogRecPtr Ptr,
|
|
TransactionId xid,
|
|
bool last_write
|
|
);
|
|
|
|
typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
|
|
|
|
typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
|
|
XLogRecPtr Ptr,
|
|
TransactionId xid
|
|
);
|
|
|
|
typedef struct LogicalDecodingContext
|
|
{
|
|
/* memory context this is all allocated in */
|
|
MemoryContext context;
|
|
|
|
/* The associated replication slot */
|
|
ReplicationSlot *slot;
|
|
|
|
/* infrastructure pieces for decoding */
|
|
XLogReaderState *reader;
|
|
struct ReorderBuffer *reorder;
|
|
struct SnapBuild *snapshot_builder;
|
|
|
|
/*
|
|
* Marks the logical decoding context as fast forward decoding one. Such a
|
|
* context does not have plugin loaded so most of the following properties
|
|
* are unused.
|
|
*/
|
|
bool fast_forward;
|
|
|
|
OutputPluginCallbacks callbacks;
|
|
OutputPluginOptions options;
|
|
|
|
/*
|
|
* User specified options
|
|
*/
|
|
List *output_plugin_options;
|
|
|
|
/*
|
|
* User-Provided callback for writing/streaming out data.
|
|
*/
|
|
LogicalOutputPluginWriterPrepareWrite prepare_write;
|
|
LogicalOutputPluginWriterWrite write;
|
|
LogicalOutputPluginWriterUpdateProgress update_progress;
|
|
|
|
/*
|
|
* Output buffer.
|
|
*/
|
|
StringInfo out;
|
|
|
|
/*
|
|
* Private data pointer of the output plugin.
|
|
*/
|
|
void *output_plugin_private;
|
|
|
|
/*
|
|
* Private data pointer for the data writer.
|
|
*/
|
|
void *output_writer_private;
|
|
|
|
/*
|
|
* Does the output plugin support streaming, and is it enabled?
|
|
*/
|
|
bool streaming;
|
|
|
|
/*
|
|
* Does the output plugin support two-phase decoding, and is it enabled?
|
|
*/
|
|
bool twophase;
|
|
|
|
/*
|
|
* Is two-phase option given by output plugin?
|
|
*
|
|
* This flag indicates that the plugin passed in the two-phase option as
|
|
* part of the START_STREAMING command. We can't rely solely on the
|
|
* twophase flag which only tells whether the plugin provided all the
|
|
* necessary two-phase callbacks.
|
|
*/
|
|
bool twophase_opt_given;
|
|
|
|
/*
|
|
* State for writing output.
|
|
*/
|
|
bool accept_writes;
|
|
bool prepared_write;
|
|
XLogRecPtr write_location;
|
|
TransactionId write_xid;
|
|
} LogicalDecodingContext;
|
|
|
|
|
|
extern void CheckLogicalDecodingRequirements(void);
|
|
|
|
extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
|
|
List *output_plugin_options,
|
|
bool need_full_snapshot,
|
|
XLogRecPtr restart_lsn,
|
|
XLogReaderRoutine *xl_routine,
|
|
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
|
LogicalOutputPluginWriterWrite do_write,
|
|
LogicalOutputPluginWriterUpdateProgress update_progress);
|
|
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
|
|
List *output_plugin_options,
|
|
bool fast_forward,
|
|
XLogReaderRoutine *xl_routine,
|
|
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
|
LogicalOutputPluginWriterWrite do_write,
|
|
LogicalOutputPluginWriterUpdateProgress update_progress);
|
|
extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
|
|
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
|
|
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
|
|
|
|
extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
|
|
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
|
|
XLogRecPtr restart_lsn);
|
|
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
|
|
|
|
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
|
|
TransactionId xid, const char *gid);
|
|
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
|
|
extern void ResetLogicalStreamingState(void);
|
|
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
|
|
|
|
#endif
|