postgresql/src/include/replication/worker_internal.h
Peter Eisentraut de43897122 Fix various concurrency issues in logical replication worker launching
The code was originally written with assumption that launcher is the
only process starting the worker.  However that hasn't been true since
commit 7c4f52409 which failed to modify the worker management code
adequately.

This patch adds an in_use field to the LogicalRepWorker struct to
indicate whether the worker slot is being used and uses proper locking
everywhere this flag is set or read.

However if the parent process dies while the new worker is starting and
the new worker fails to attach to shared memory, this flag would never
get cleared.  We solve this rare corner case by adding a sort of garbage
collector for in_use slots.  This uses another field in the
LogicalRepWorker struct named launch_time that contains the time when
the worker was started.  If any request to start a new worker does not
find free slot, we'll check for workers that were supposed to start but
took too long to actually do so, and reuse their slot.

In passing also fix possible race conditions when stopping a worker that
hasn't finished starting yet.

Author: Petr Jelinek <petr.jelinek@2ndquadrant.com>
Reported-by: Fujii Masao <masao.fujii@gmail.com>
2017-04-26 10:45:59 -04:00

97 lines
2.7 KiB
C

/*-------------------------------------------------------------------------
*
* worker_internal.h
* Internal headers shared by logical replication workers.
*
* Portions Copyright (c) 2016-2017, PostgreSQL Global Development Group
*
* src/include/replication/worker_internal.h
*
*-------------------------------------------------------------------------
*/
#ifndef WORKER_INTERNAL_H
#define WORKER_INTERNAL_H
#include <signal.h>
#include "access/xlogdefs.h"
#include "catalog/pg_subscription.h"
#include "datatype/timestamp.h"
#include "storage/lock.h"
typedef struct LogicalRepWorker
{
/* Time at which this worker was launched. */
TimestampTz launch_time;
/* Indicates if this slot is used or free. */
bool in_use;
/* Increased everytime the slot is taken by new worker. */
uint16 generation;
/* Pointer to proc array. NULL if not running. */
PGPROC *proc;
/* Database id to connect to. */
Oid dbid;
/* User to use for connection (will be same as owner of subscription). */
Oid userid;
/* Subscription id for the worker. */
Oid subid;
/* Used for initial table synchronization. */
Oid relid;
char relstate;
XLogRecPtr relstate_lsn;
slock_t relmutex;
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
TimestampTz last_recv_time;
XLogRecPtr reply_lsn;
TimestampTz reply_time;
} LogicalRepWorker;
/* Memory context for cached variables in apply worker. */
extern MemoryContext ApplyCacheContext;
/* libpqreceiver connection */
extern struct WalReceiverConn *wrconn;
/* Worker and subscription objects. */
extern Subscription *MySubscription;
extern LogicalRepWorker *MyLogicalRepWorker;
extern bool in_remote_transaction;
extern volatile sig_atomic_t got_SIGHUP;
extern volatile sig_atomic_t got_SIGTERM;
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid);
extern void logicalrep_worker_sighup(SIGNAL_ARGS);
extern void logicalrep_worker_sigterm(SIGNAL_ARGS);
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
void process_syncing_tables(XLogRecPtr current_lsn);
void invalidate_syncing_table_states(Datum arg, int cacheid,
uint32 hashvalue);
static inline bool
am_tablesync_worker(void)
{
return OidIsValid(MyLogicalRepWorker->relid);
}
#endif /* WORKER_INTERNAL_H */