mirror of
https://github.com/postgres/postgres.git
synced 2026-03-09 09:40:40 -04:00
The problem is that we don't send keep-alive messages for a long time while processing large transactions during logical replication where we don't send any data of such transactions. This can happen when the table modified in the transaction is not published or because all the changes got filtered. We do try to send the keep_alive if necessary at the end of the transaction (via WalSndWriteData()) but by that time the subscriber-side can timeout and exit. To fix this we try to send the keepalive message if required after processing certain threshold of changes. Reported-by: Fabrice Chapuis Author: Wang wei and Amit Kapila Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda Backpatch-through: 10 Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
137 lines
4 KiB
C
137 lines
4 KiB
C
/*-------------------------------------------------------------------------
|
|
* logical.h
|
|
* PostgreSQL logical decoding coordination
|
|
*
|
|
* Copyright (c) 2012-2021, PostgreSQL Global Development Group
|
|
*
|
|
*-------------------------------------------------------------------------
|
|
*/
|
|
#ifndef LOGICAL_H
|
|
#define LOGICAL_H
|
|
|
|
#include "access/xlog.h"
|
|
#include "access/xlogreader.h"
|
|
#include "replication/output_plugin.h"
|
|
#include "replication/slot.h"
|
|
|
|
struct LogicalDecodingContext;
|
|
|
|
typedef void (*LogicalOutputPluginWriterWrite) (struct LogicalDecodingContext *lr,
|
|
XLogRecPtr Ptr,
|
|
TransactionId xid,
|
|
bool last_write
|
|
);
|
|
|
|
typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;
|
|
|
|
typedef void (*LogicalOutputPluginWriterUpdateProgress) (struct LogicalDecodingContext *lr,
|
|
XLogRecPtr Ptr,
|
|
TransactionId xid
|
|
);
|
|
|
|
typedef struct LogicalDecodingContext
|
|
{
|
|
/* memory context this is all allocated in */
|
|
MemoryContext context;
|
|
|
|
/* The associated replication slot */
|
|
ReplicationSlot *slot;
|
|
|
|
/* infrastructure pieces for decoding */
|
|
XLogReaderState *reader;
|
|
struct ReorderBuffer *reorder;
|
|
struct SnapBuild *snapshot_builder;
|
|
|
|
/*
|
|
* Marks the logical decoding context as fast forward decoding one. Such a
|
|
* context does not have plugin loaded so most of the following properties
|
|
* are unused.
|
|
*/
|
|
bool fast_forward;
|
|
|
|
/* Are we processing the end LSN of a transaction? */
|
|
bool end_xact;
|
|
|
|
OutputPluginCallbacks callbacks;
|
|
OutputPluginOptions options;
|
|
|
|
/*
|
|
* User specified options
|
|
*/
|
|
List *output_plugin_options;
|
|
|
|
/*
|
|
* User-Provided callback for writing/streaming out data.
|
|
*/
|
|
LogicalOutputPluginWriterPrepareWrite prepare_write;
|
|
LogicalOutputPluginWriterWrite write;
|
|
LogicalOutputPluginWriterUpdateProgress update_progress;
|
|
|
|
/*
|
|
* Output buffer.
|
|
*/
|
|
StringInfo out;
|
|
|
|
/*
|
|
* Private data pointer of the output plugin.
|
|
*/
|
|
void *output_plugin_private;
|
|
|
|
/*
|
|
* Private data pointer for the data writer.
|
|
*/
|
|
void *output_writer_private;
|
|
|
|
/*
|
|
* Does the output plugin support streaming, and is it enabled?
|
|
*/
|
|
bool streaming;
|
|
|
|
/*
|
|
* Does the output plugin support two-phase decoding, and is it enabled?
|
|
*/
|
|
bool twophase;
|
|
|
|
/*
|
|
* State for writing output.
|
|
*/
|
|
bool accept_writes;
|
|
bool prepared_write;
|
|
XLogRecPtr write_location;
|
|
TransactionId write_xid;
|
|
} LogicalDecodingContext;
|
|
|
|
|
|
extern void CheckLogicalDecodingRequirements(void);
|
|
|
|
extern LogicalDecodingContext *CreateInitDecodingContext(const char *plugin,
|
|
List *output_plugin_options,
|
|
bool need_full_snapshot,
|
|
XLogRecPtr restart_lsn,
|
|
XLogReaderRoutine *xl_routine,
|
|
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
|
LogicalOutputPluginWriterWrite do_write,
|
|
LogicalOutputPluginWriterUpdateProgress update_progress);
|
|
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
|
|
List *output_plugin_options,
|
|
bool fast_forward,
|
|
XLogReaderRoutine *xl_routine,
|
|
LogicalOutputPluginWriterPrepareWrite prepare_write,
|
|
LogicalOutputPluginWriterWrite do_write,
|
|
LogicalOutputPluginWriterUpdateProgress update_progress);
|
|
extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
|
|
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
|
|
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
|
|
|
|
extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin);
|
|
extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
|
|
XLogRecPtr restart_lsn);
|
|
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
|
|
|
|
extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
|
|
TransactionId xid, const char *gid);
|
|
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
|
|
extern void ResetLogicalStreamingState(void);
|
|
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
|
|
|
|
#endif
|