mirror of
https://github.com/postgres/postgres.git
synced 2026-02-20 08:20:55 -05:00
Use ProcNumber rather than pid in ReplicationSlot
This helps the next commit. Reviewed-by: Chao Li <li.evan.chao@gmail.com> Discussion: https://www.postgresql.org/message-id/4cc13ba1-4248-4884-b6ba-4805349e7f39@iki.fi
This commit is contained in:
parent
f33c585774
commit
ddc3250208
4 changed files with 49 additions and 36 deletions
|
|
@ -1757,7 +1757,7 @@ update_synced_slots_inactive_since(void)
|
|||
Assert(SlotIsLogical(s));
|
||||
|
||||
/* The slot must not be acquired by any process */
|
||||
Assert(s->active_pid == 0);
|
||||
Assert(s->active_proc == INVALID_PROC_NUMBER);
|
||||
|
||||
/* Use the same inactive_since time for all the slots. */
|
||||
if (now == 0)
|
||||
|
|
|
|||
|
|
@ -226,6 +226,7 @@ ReplicationSlotsShmemInit(void)
|
|||
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
|
||||
|
||||
/* everything else is zeroed by the memset above */
|
||||
slot->active_proc = INVALID_PROC_NUMBER;
|
||||
SpinLockInit(&slot->mutex);
|
||||
LWLockInitialize(&slot->io_in_progress_lock,
|
||||
LWTRANCHE_REPLICATION_SLOT_IO);
|
||||
|
|
@ -461,7 +462,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
|
|||
* be doing that. So it's safe to initialize the slot.
|
||||
*/
|
||||
Assert(!slot->in_use);
|
||||
Assert(slot->active_pid == 0);
|
||||
Assert(slot->active_proc == INVALID_PROC_NUMBER);
|
||||
|
||||
/* first initialize persistent data */
|
||||
memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
|
||||
|
|
@ -505,8 +506,8 @@ ReplicationSlotCreate(const char *name, bool db_specific,
|
|||
|
||||
/* We can now mark the slot active, and that makes it our slot. */
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
Assert(slot->active_pid == 0);
|
||||
slot->active_pid = MyProcPid;
|
||||
Assert(slot->active_proc == INVALID_PROC_NUMBER);
|
||||
slot->active_proc = MyProcNumber;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
MyReplicationSlot = slot;
|
||||
|
||||
|
|
@ -620,6 +621,7 @@ void
|
|||
ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
|
||||
{
|
||||
ReplicationSlot *s;
|
||||
ProcNumber active_proc;
|
||||
int active_pid;
|
||||
|
||||
Assert(name != NULL);
|
||||
|
|
@ -672,17 +674,18 @@ retry:
|
|||
* to inactive_since in InvalidatePossiblyObsoleteSlot.
|
||||
*/
|
||||
SpinLockAcquire(&s->mutex);
|
||||
if (s->active_pid == 0)
|
||||
s->active_pid = MyProcPid;
|
||||
active_pid = s->active_pid;
|
||||
if (s->active_proc == INVALID_PROC_NUMBER)
|
||||
s->active_proc = MyProcNumber;
|
||||
active_proc = s->active_proc;
|
||||
ReplicationSlotSetInactiveSince(s, 0, false);
|
||||
SpinLockRelease(&s->mutex);
|
||||
}
|
||||
else
|
||||
{
|
||||
s->active_pid = active_pid = MyProcPid;
|
||||
s->active_proc = active_proc = MyProcNumber;
|
||||
ReplicationSlotSetInactiveSince(s, 0, true);
|
||||
}
|
||||
active_pid = GetPGProcByNumber(active_proc)->pid;
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
|
||||
/*
|
||||
|
|
@ -690,7 +693,7 @@ retry:
|
|||
* wait until the owning process signals us that it's been released, or
|
||||
* error out.
|
||||
*/
|
||||
if (active_pid != MyProcPid)
|
||||
if (active_proc != MyProcNumber)
|
||||
{
|
||||
if (!nowait)
|
||||
{
|
||||
|
|
@ -762,7 +765,7 @@ ReplicationSlotRelease(void)
|
|||
bool is_logical;
|
||||
TimestampTz now = 0;
|
||||
|
||||
Assert(slot != NULL && slot->active_pid != 0);
|
||||
Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
|
||||
|
||||
is_logical = SlotIsLogical(slot);
|
||||
|
||||
|
|
@ -815,7 +818,7 @@ ReplicationSlotRelease(void)
|
|||
* disconnecting, but wake up others that may be waiting for it.
|
||||
*/
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
slot->active_pid = 0;
|
||||
slot->active_proc = INVALID_PROC_NUMBER;
|
||||
ReplicationSlotSetInactiveSince(slot, now, false);
|
||||
SpinLockRelease(&slot->mutex);
|
||||
ConditionVariableBroadcast(&slot->active_cv);
|
||||
|
|
@ -877,7 +880,7 @@ restart:
|
|||
found_valid_logicalslot |=
|
||||
(SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
|
||||
|
||||
if ((s->active_pid == MyProcPid &&
|
||||
if ((s->active_proc == MyProcNumber &&
|
||||
(!synced_only || s->data.synced)))
|
||||
{
|
||||
Assert(s->data.persistency == RS_TEMPORARY);
|
||||
|
|
@ -1088,7 +1091,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
|
|||
bool fail_softly = slot->data.persistency != RS_PERSISTENT;
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
slot->active_pid = 0;
|
||||
slot->active_proc = INVALID_PROC_NUMBER;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
/* wake up anyone waiting on this slot */
|
||||
|
|
@ -1110,7 +1113,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
|
|||
* Also wake up processes waiting for it.
|
||||
*/
|
||||
LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
|
||||
slot->active_pid = 0;
|
||||
slot->active_proc = INVALID_PROC_NUMBER;
|
||||
slot->in_use = false;
|
||||
LWLockRelease(ReplicationSlotControlLock);
|
||||
ConditionVariableBroadcast(&slot->active_cv);
|
||||
|
|
@ -1476,7 +1479,7 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
|
|||
/* count slots with spinlock held */
|
||||
SpinLockAcquire(&s->mutex);
|
||||
(*nslots)++;
|
||||
if (s->active_pid != 0)
|
||||
if (s->active_proc != INVALID_PROC_NUMBER)
|
||||
(*nactive)++;
|
||||
SpinLockRelease(&s->mutex);
|
||||
}
|
||||
|
|
@ -1520,7 +1523,7 @@ restart:
|
|||
{
|
||||
ReplicationSlot *s;
|
||||
char *slotname;
|
||||
int active_pid;
|
||||
ProcNumber active_proc;
|
||||
|
||||
s = &ReplicationSlotCtl->replication_slots[i];
|
||||
|
||||
|
|
@ -1550,11 +1553,11 @@ restart:
|
|||
SpinLockAcquire(&s->mutex);
|
||||
/* can't change while ReplicationSlotControlLock is held */
|
||||
slotname = NameStr(s->data.name);
|
||||
active_pid = s->active_pid;
|
||||
if (active_pid == 0)
|
||||
active_proc = s->active_proc;
|
||||
if (active_proc == INVALID_PROC_NUMBER)
|
||||
{
|
||||
MyReplicationSlot = s;
|
||||
s->active_pid = MyProcPid;
|
||||
s->active_proc = MyProcNumber;
|
||||
}
|
||||
SpinLockRelease(&s->mutex);
|
||||
|
||||
|
|
@ -1579,11 +1582,11 @@ restart:
|
|||
* XXX: We can consider shutting down the slot sync worker before
|
||||
* trying to drop synced temporary slots here.
|
||||
*/
|
||||
if (active_pid)
|
||||
if (active_proc != INVALID_PROC_NUMBER)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_OBJECT_IN_USE),
|
||||
errmsg("replication slot \"%s\" is active for PID %d",
|
||||
slotname, active_pid)));
|
||||
slotname, GetPGProcByNumber(active_proc)->pid)));
|
||||
|
||||
/*
|
||||
* To avoid duplicating ReplicationSlotDropAcquired() and to avoid
|
||||
|
|
@ -1974,6 +1977,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
|
|||
{
|
||||
XLogRecPtr restart_lsn;
|
||||
NameData slotname;
|
||||
ProcNumber active_proc;
|
||||
int active_pid = 0;
|
||||
ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
|
||||
TimestampTz now = 0;
|
||||
|
|
@ -2027,7 +2031,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
|
|||
}
|
||||
|
||||
slotname = s->data.name;
|
||||
active_pid = s->active_pid;
|
||||
active_proc = s->active_proc;
|
||||
|
||||
/*
|
||||
* If the slot can be acquired, do so and mark it invalidated
|
||||
|
|
@ -2039,10 +2043,10 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
|
|||
* is terminated. So, the inactive slot can only be invalidated
|
||||
* immediately without being terminated.
|
||||
*/
|
||||
if (active_pid == 0)
|
||||
if (active_proc == INVALID_PROC_NUMBER)
|
||||
{
|
||||
MyReplicationSlot = s;
|
||||
s->active_pid = MyProcPid;
|
||||
s->active_proc = MyProcNumber;
|
||||
s->data.invalidated = invalidation_cause;
|
||||
|
||||
/*
|
||||
|
|
@ -2058,6 +2062,11 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
|
|||
/* Let caller know */
|
||||
invalidated = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
active_pid = GetPGProcByNumber(active_proc)->pid;
|
||||
Assert(active_pid != 0);
|
||||
}
|
||||
|
||||
SpinLockRelease(&s->mutex);
|
||||
|
||||
|
|
@ -2073,7 +2082,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
|
|||
&slot_idle_usecs);
|
||||
}
|
||||
|
||||
if (active_pid != 0)
|
||||
if (active_proc != INVALID_PROC_NUMBER)
|
||||
{
|
||||
/*
|
||||
* Prepare the sleep on the slot's condition variable before
|
||||
|
|
@ -2107,7 +2116,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
|
|||
if (MyBackendType == B_STARTUP)
|
||||
(void) SendProcSignal(active_pid,
|
||||
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
|
||||
INVALID_PROC_NUMBER);
|
||||
active_proc);
|
||||
else
|
||||
(void) kill(active_pid, SIGTERM);
|
||||
|
||||
|
|
@ -2875,7 +2884,7 @@ RestoreSlotFromDisk(const char *name)
|
|||
slot->candidate_restart_valid = InvalidXLogRecPtr;
|
||||
|
||||
slot->in_use = true;
|
||||
slot->active_pid = 0;
|
||||
slot->active_proc = INVALID_PROC_NUMBER;
|
||||
|
||||
/*
|
||||
* Set the time since the slot has become inactive after loading the
|
||||
|
|
@ -3158,7 +3167,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
|
|||
SpinLockAcquire(&slot->mutex);
|
||||
restart_lsn = slot->data.restart_lsn;
|
||||
invalidated = slot->data.invalidated != RS_INVAL_NONE;
|
||||
inactive = slot->active_pid == 0;
|
||||
inactive = slot->active_proc == INVALID_PROC_NUMBER;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
|
||||
if (invalidated)
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@
|
|||
#include "replication/logical.h"
|
||||
#include "replication/slot.h"
|
||||
#include "replication/slotsync.h"
|
||||
#include "storage/proc.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/pg_lsn.h"
|
||||
|
|
@ -309,10 +310,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
|||
values[i++] = ObjectIdGetDatum(slot_contents.data.database);
|
||||
|
||||
values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
|
||||
values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
|
||||
values[i++] = BoolGetDatum(slot_contents.active_proc != INVALID_PROC_NUMBER);
|
||||
|
||||
if (slot_contents.active_pid != 0)
|
||||
values[i++] = Int32GetDatum(slot_contents.active_pid);
|
||||
if (slot_contents.active_proc != INVALID_PROC_NUMBER)
|
||||
values[i++] = Int32GetDatum(GetPGProcByNumber(slot_contents.active_proc)->pid);
|
||||
else
|
||||
nulls[i++] = true;
|
||||
|
||||
|
|
@ -377,13 +378,13 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
|
|||
*/
|
||||
if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
|
||||
{
|
||||
int pid;
|
||||
ProcNumber procno;
|
||||
|
||||
SpinLockAcquire(&slot->mutex);
|
||||
pid = slot->active_pid;
|
||||
procno = slot->active_proc;
|
||||
slot_contents.data.restart_lsn = slot->data.restart_lsn;
|
||||
SpinLockRelease(&slot->mutex);
|
||||
if (pid != 0)
|
||||
if (procno != INVALID_PROC_NUMBER)
|
||||
{
|
||||
values[i++] = CStringGetTextDatum("unreserved");
|
||||
walstate = WALAVAIL_UNRESERVED;
|
||||
|
|
|
|||
|
|
@ -185,8 +185,11 @@ typedef struct ReplicationSlot
|
|||
/* is this slot defined */
|
||||
bool in_use;
|
||||
|
||||
/* Who is streaming out changes for this slot? 0 in unused slots. */
|
||||
pid_t active_pid;
|
||||
/*
|
||||
* Who is streaming out changes for this slot? INVALID_PROC_NUMBER in
|
||||
* unused slots.
|
||||
*/
|
||||
ProcNumber active_proc;
|
||||
|
||||
/* any outstanding modifications? */
|
||||
bool just_dirtied;
|
||||
|
|
|
|||
Loading…
Reference in a new issue