postgresql/src/backend/executor/tqueue.c

263 lines
6.2 KiB
C
Raw Normal View History

/*-------------------------------------------------------------------------
*
* tqueue.c
* Use shm_mq to send & receive tuples between parallel backends
*
* A DestReceiver of type DestTupleQueue, which is a TQueueDestReceiver
* under the hood, writes tuples from the executor to a shm_mq.
*
* A TupleQueueFunnel helps manage the process of reading tuples from
* one or more shm_mq objects being used as tuple queues.
*
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/executor/tqueue.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "executor/tqueue.h"
#include "miscadmin.h"
typedef struct
{
DestReceiver pub;
shm_mq_handle *handle;
} TQueueDestReceiver;
struct TupleQueueFunnel
{
int nqueues;
int maxqueues;
int nextqueue;
shm_mq_handle **queue;
};
/*
* Receive a tuple.
*/
static void
tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
{
TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
HeapTuple tuple;
tuple = ExecMaterializeSlot(slot);
shm_mq_send(tqueue->handle, tuple->t_len, tuple->t_data, false);
}
/*
* Prepare to receive tuples from executor.
*/
static void
tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
{
/* do nothing */
}
/*
* Clean up at end of an executor run
*/
static void
tqueueShutdownReceiver(DestReceiver *self)
{
/* do nothing */
}
/*
* Destroy receiver when done with it
*/
static void
tqueueDestroyReceiver(DestReceiver *self)
{
pfree(self);
}
/*
* Create a DestReceiver that writes tuples to a tuple queue.
*/
DestReceiver *
CreateTupleQueueDestReceiver(shm_mq_handle *handle)
{
TQueueDestReceiver *self;
self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
self->pub.receiveSlot = tqueueReceiveSlot;
self->pub.rStartup = tqueueStartupReceiver;
self->pub.rShutdown = tqueueShutdownReceiver;
self->pub.rDestroy = tqueueDestroyReceiver;
self->pub.mydest = DestTupleQueue;
self->handle = handle;
return (DestReceiver *) self;
}
/*
* Create a tuple queue funnel.
*/
TupleQueueFunnel *
CreateTupleQueueFunnel(void)
{
TupleQueueFunnel *funnel = palloc0(sizeof(TupleQueueFunnel));
funnel->maxqueues = 8;
funnel->queue = palloc(funnel->maxqueues * sizeof(shm_mq_handle *));
return funnel;
}
/*
* Destroy a tuple queue funnel.
*/
void
DestroyTupleQueueFunnel(TupleQueueFunnel *funnel)
{
int i;
for (i = 0; i < funnel->nqueues; i++)
shm_mq_detach(shm_mq_get_queue(funnel->queue[i]));
pfree(funnel->queue);
pfree(funnel);
}
/*
* Remember the shared memory queue handle in funnel.
*/
void
RegisterTupleQueueOnFunnel(TupleQueueFunnel *funnel, shm_mq_handle *handle)
{
if (funnel->nqueues < funnel->maxqueues)
{
funnel->queue[funnel->nqueues++] = handle;
return;
}
if (funnel->nqueues >= funnel->maxqueues)
{
int newsize = funnel->nqueues * 2;
Assert(funnel->nqueues == funnel->maxqueues);
funnel->queue = repalloc(funnel->queue,
newsize * sizeof(shm_mq_handle *));
funnel->maxqueues = newsize;
}
funnel->queue[funnel->nqueues++] = handle;
}
/*
* Fetch a tuple from a tuple queue funnel.
*
* We try to read from the queues in round-robin fashion so as to avoid
* the situation where some workers get their tuples read expediently while
* others are barely ever serviced.
*
* Even when nowait = false, we read from the individual queues in
* non-blocking mode. Even when shm_mq_receive() returns SHM_MQ_WOULD_BLOCK,
* it can still accumulate bytes from a partially-read message, so doing it
* this way should outperform doing a blocking read on each queue in turn.
*
* The return value is NULL if there are no remaining queues or if
* nowait = true and no queue returned a tuple without blocking. *done, if
* not NULL, is set to true when there are no remaining queues and false in
* any other case.
*/
HeapTuple
TupleQueueFunnelNext(TupleQueueFunnel *funnel, bool nowait, bool *done)
{
int waitpos = funnel->nextqueue;
/* Corner case: called before adding any queues, or after all are gone. */
if (funnel->nqueues == 0)
{
if (done != NULL)
*done = true;
return NULL;
}
if (done != NULL)
*done = false;
for (;;)
{
shm_mq_handle *mqh = funnel->queue[funnel->nextqueue];
shm_mq_result result;
Size nbytes;
void *data;
/* Attempt to read a message. */
result = shm_mq_receive(mqh, &nbytes, &data, true);
/*
* Normally, we advance funnel->nextqueue to the next queue at this
* point, but if we're pointing to a queue that we've just discovered
* is detached, then forget that queue and leave the pointer where it
* is until the number of remaining queues fall below that pointer and
* at that point make the pointer point to the first queue.
*/
if (result != SHM_MQ_DETACHED)
funnel->nextqueue = (funnel->nextqueue + 1) % funnel->nqueues;
else
{
--funnel->nqueues;
if (funnel->nqueues == 0)
{
if (done != NULL)
*done = true;
return NULL;
}
memmove(&funnel->queue[funnel->nextqueue],
&funnel->queue[funnel->nextqueue + 1],
sizeof(shm_mq_handle *)
* (funnel->nqueues - funnel->nextqueue));
if (funnel->nextqueue >= funnel->nqueues)
funnel->nextqueue = 0;
if (funnel->nextqueue < waitpos)
--waitpos;
continue;
}
/* If we got a message, return it. */
if (result == SHM_MQ_SUCCESS)
{
HeapTupleData htup;
/*
* The tuple data we just read from the queue is only valid until
* we again attempt to read from it. Copy the tuple into a single
* palloc'd chunk as callers will expect.
*/
ItemPointerSetInvalid(&htup.t_self);
htup.t_tableOid = InvalidOid;
htup.t_len = nbytes;
htup.t_data = data;
return heap_copytuple(&htup);
}
/*
* If we've visited all of the queues, then we should either give up
* and return NULL (if we're in non-blocking mode) or wait for the
* process latch to be set (otherwise).
*/
if (funnel->nextqueue == waitpos)
{
if (nowait)
return NULL;
WaitLatch(MyLatch, WL_LATCH_SET, 0);
CHECK_FOR_INTERRUPTS();
ResetLatch(MyLatch);
}
}
}