postgresql/src/include/replication/walreceiver.h
Alvaro Herrera 572d6ee6d4 Fix locking in WAL receiver/sender shmem state structs
In WAL receiver and WAL server, some accesses to their corresponding
shared memory control structs were done without holding any kind of
lock, which could lead to inconsistent and possibly insecure results.

In walsender, fix by clarifying the locking rules and following them
correctly, as documented in the new comment in walsender_private.h;
namely that some members can be read in walsender itself without a lock,
because the only writes occur in the same process.  The rest of the
struct requires spinlock for accesses, as usual.

In walreceiver, fix by always holding spinlock while accessing the
struct.

While there is potentially a problem in all branches, it is minor in
stable ones.  This only became a real problem in pg10 because of quorum
commit in synchronous replication (commit 3901fd70cc), and a potential
security problem in walreceiver because a superuser() check was removed
by default monitoring roles (commit 25fff40798).  Thus, no backpatch.

In passing, clean up some leftover braces which were used to create
unconditional blocks.  Once upon a time these were used for
volatile-izing accesses to those shmem structs, which is no longer
required.  Many other occurrences of this pattern remain.

Author: Michaël Paquier
Reported-by: Michaël Paquier
Reviewed-by: Masahiko Sawada, Kyotaro Horiguchi, Thomas Munro,
	Robert Haas
Discussion: https://postgr.es/m/CAB7nPqTWYqtzD=LN_oDaf9r-hAjUEPAy0B9yRkhcsLdRN8fzrw@mail.gmail.com
2017-06-30 18:06:33 -04:00

301 lines
9.7 KiB
C

/*-------------------------------------------------------------------------
*
* walreceiver.h
* Exports from replication/walreceiverfuncs.c.
*
* Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
*
* src/include/replication/walreceiver.h
*
*-------------------------------------------------------------------------
*/
#ifndef _WALRECEIVER_H
#define _WALRECEIVER_H
#include "access/xlog.h"
#include "access/xlogdefs.h"
#include "fmgr.h"
#include "replication/logicalproto.h"
#include "replication/walsender.h"
#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"
#include "utils/tuplestore.h"
/* user-settable parameters */
extern int wal_receiver_status_interval;
extern int wal_receiver_timeout;
extern bool hot_standby_feedback;
/*
* MAXCONNINFO: maximum size of a connection string.
*
* XXX: Should this move to pg_config_manual.h?
*/
#define MAXCONNINFO 1024
/* Can we allow the standby to accept replication connection from another standby? */
#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
/*
* Values for WalRcv->walRcvState.
*/
typedef enum
{
WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
WALRCV_STREAMING, /* walreceiver is streaming */
WALRCV_WAITING, /* stopped streaming, waiting for orders */
WALRCV_RESTARTING, /* asked to restart streaming */
WALRCV_STOPPING /* requested to stop, but still running */
} WalRcvState;
/* Shared memory area for management of walreceiver process */
typedef struct
{
/*
* PID of currently active walreceiver process, its current state and
* start time (actually, the time at which it was requested to be
* started).
*/
pid_t pid;
WalRcvState walRcvState;
pg_time_t startTime;
/*
* receiveStart and receiveStartTLI indicate the first byte position and
* timeline that will be received. When startup process starts the
* walreceiver, it sets these to the point where it wants the streaming to
* begin.
*/
XLogRecPtr receiveStart;
TimeLineID receiveStartTLI;
/*
* receivedUpto-1 is the last byte position that has already been
* received, and receivedTLI is the timeline it came from. At the first
* startup of walreceiver, these are set to receiveStart and
* receiveStartTLI. After that, walreceiver updates these whenever it
* flushes the received WAL to disk.
*/
XLogRecPtr receivedUpto;
TimeLineID receivedTLI;
/*
* latestChunkStart is the starting byte position of the current "batch"
* of received WAL. It's actually the same as the previous value of
* receivedUpto before the last flush to disk. Startup process can use
* this to detect whether it's keeping up or not.
*/
XLogRecPtr latestChunkStart;
/*
* Time of send and receive of any message received.
*/
TimestampTz lastMsgSendTime;
TimestampTz lastMsgReceiptTime;
/*
* Latest reported end of WAL on the sender
*/
XLogRecPtr latestWalEnd;
TimestampTz latestWalEndTime;
/*
* connection string; initially set to connect to the primary, and later
* clobbered to hide security-sensitive fields.
*/
char conninfo[MAXCONNINFO];
/*
* replication slot name; is also used for walreceiver to connect with the
* primary
*/
char slotname[NAMEDATALEN];
/* set true once conninfo is ready to display (obfuscated pwds etc) */
bool ready_to_display;
slock_t mutex; /* locks shared variables shown above */
/*
* force walreceiver reply? This doesn't need to be locked; memory
* barriers for ordering are sufficient.
*/
bool force_reply;
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
* receiveStartTLI), and also to tell it to send apply feedback to the
* primary whenever specially marked commit records are applied. This is
* normally mapped to procLatch when walreceiver is running.
*/
Latch *latch;
} WalRcvData;
extern WalRcvData *WalRcv;
typedef struct
{
bool logical; /* True if this is logical replication stream,
* false if physical stream. */
char *slotname; /* Name of the replication slot or NULL. */
XLogRecPtr startpoint; /* LSN of starting point. */
union
{
struct
{
TimeLineID startpointTLI; /* Starting timeline */
} physical;
struct
{
uint32 proto_version; /* Logical protocol version */
List *publication_names; /* String list of publications */
} logical;
} proto;
} WalRcvStreamOptions;
struct WalReceiverConn;
typedef struct WalReceiverConn WalReceiverConn;
/*
* Status of walreceiver query execution.
*
* We only define statuses that are currently used.
*/
typedef enum
{
WALRCV_ERROR, /* There was error when executing the query. */
WALRCV_OK_COMMAND, /* Query executed utility or replication
* command. */
WALRCV_OK_TUPLES, /* Query returned tuples. */
WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication
* protocol. */
} WalRcvExecStatus;
/*
* Return value for walrcv_query, returns the status of the execution and
* tuples if any.
*/
typedef struct WalRcvExecResult
{
WalRcvExecStatus status;
char *err;
Tuplestorestate *tuplestore;
TupleDesc tupledesc;
} WalRcvExecResult;
/* libpqwalreceiver hooks */
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo, bool logical,
const char *appname,
char **err);
typedef void (*walrcv_check_conninfo_fn) (const char *conninfo);
typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
TimeLineID *primary_tli,
int *server_version);
typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
TimeLineID tli,
char **filename,
char **content, int *size);
typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
const WalRcvStreamOptions *options);
typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
TimeLineID *next_tli);
typedef int (*walrcv_receive_fn) (WalReceiverConn *conn, char **buffer,
pgsocket *wait_fd);
typedef void (*walrcv_send_fn) (WalReceiverConn *conn, const char *buffer,
int nbytes);
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
const char *slotname, bool temporary,
CRSSnapshotAction snapshot_action,
XLogRecPtr *lsn);
typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
const char *query,
const int nRetTypes,
const Oid *retTypes);
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
typedef struct WalReceiverFunctionsType
{
walrcv_connect_fn walrcv_connect;
walrcv_check_conninfo_fn walrcv_check_conninfo;
walrcv_get_conninfo_fn walrcv_get_conninfo;
walrcv_identify_system_fn walrcv_identify_system;
walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
walrcv_startstreaming_fn walrcv_startstreaming;
walrcv_endstreaming_fn walrcv_endstreaming;
walrcv_receive_fn walrcv_receive;
walrcv_send_fn walrcv_send;
walrcv_create_slot_fn walrcv_create_slot;
walrcv_exec_fn walrcv_exec;
walrcv_disconnect_fn walrcv_disconnect;
} WalReceiverFunctionsType;
extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
#define walrcv_connect(conninfo, logical, appname, err) \
WalReceiverFunctions->walrcv_connect(conninfo, logical, appname, err)
#define walrcv_check_conninfo(conninfo) \
WalReceiverFunctions->walrcv_check_conninfo(conninfo)
#define walrcv_get_conninfo(conn) \
WalReceiverFunctions->walrcv_get_conninfo(conn)
#define walrcv_identify_system(conn, primary_tli, server_version) \
WalReceiverFunctions->walrcv_identify_system(conn, primary_tli, server_version)
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
#define walrcv_startstreaming(conn, options) \
WalReceiverFunctions->walrcv_startstreaming(conn, options)
#define walrcv_endstreaming(conn, next_tli) \
WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
#define walrcv_receive(conn, buffer, wait_fd) \
WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
#define walrcv_send(conn, buffer, nbytes) \
WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
#define walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn) \
WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, snapshot_action, lsn)
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_disconnect(conn) \
WalReceiverFunctions->walrcv_disconnect(conn)
static inline void
walrcv_clear_result(WalRcvExecResult *walres)
{
if (!walres)
return;
if (walres->err)
pfree(walres->err);
if (walres->tuplestore)
tuplestore_end(walres->tuplestore);
if (walres->tupledesc)
FreeTupleDesc(walres->tupledesc);
pfree(walres);
}
/* prototypes for functions in walreceiver.c */
extern void WalReceiverMain(void) pg_attribute_noreturn();
/* prototypes for functions in walreceiverfuncs.c */
extern Size WalRcvShmemSize(void);
extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void);
extern bool WalRcvStreaming(void);
extern bool WalRcvRunning(void);
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
const char *conninfo, const char *slotname);
extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
extern void WalRcvForceReply(void);
#endif /* _WALRECEIVER_H */