postgresql/src/backend/executor/nodeHash.c
Tom Lane db3c4c3a2d Split 'BufFile' routines out of fd.c into a new module, buffile.c. Extend
BufFile so that it handles multi-segment temporary files transparently.
This allows sorts and hashes to work with data exceeding 2Gig (or whatever
the local limit on file size is).  Change psort.c to use relative seeks
instead of absolute seeks for backwards scanning, so that it won't fail
when the data volume exceeds 2Gig.
1999-10-13 15:02:32 +00:00

724 lines
19 KiB
C

/*-------------------------------------------------------------------------
*
* nodeHash.c
* Routines to hash relations for hashjoin
*
* Copyright (c) 1994, Regents of the University of California
*
*
* $Id: nodeHash.c,v 1.39 1999/10/13 15:02:25 tgl Exp $
*
*-------------------------------------------------------------------------
*/
/*
* INTERFACE ROUTINES
* ExecHash - generate an in-memory hash table of the relation
* ExecInitHash - initialize node and subnodes..
* ExecEndHash - shutdown node and subnodes
*
*/
#include <sys/types.h>
#include <math.h>
#include "postgres.h"
#include "executor/execdebug.h"
#include "executor/executor.h"
#include "executor/nodeHash.h"
#include "executor/nodeHashjoin.h"
#include "utils/portal.h"
extern int SortMem;
static int hashFunc(Datum key, int len, bool byVal);
/* ----------------------------------------------------------------
* ExecHash
*
* build hash table for hashjoin, all do partitioning if more
* than one batches are required.
* ----------------------------------------------------------------
*/
TupleTableSlot *
ExecHash(Hash *node)
{
EState *estate;
HashState *hashstate;
Plan *outerNode;
Var *hashkey;
HashJoinTable hashtable;
TupleTableSlot *slot;
ExprContext *econtext;
int nbatch;
int i;
/* ----------------
* get state info from node
* ----------------
*/
hashstate = node->hashstate;
estate = node->plan.state;
outerNode = outerPlan(node);
hashtable = hashstate->hashtable;
if (hashtable == NULL)
elog(ERROR, "ExecHash: hash table is NULL.");
nbatch = hashtable->nbatch;
if (nbatch > 0)
{
/* ----------------
* Open temp files for inner batches, if needed.
* Note that file buffers are palloc'd in regular executor context.
* ----------------
*/
for (i = 0; i < nbatch; i++)
hashtable->innerBatchFile[i] = BufFileCreateTemp();
}
/* ----------------
* set expression context
* ----------------
*/
hashkey = node->hashkey;
econtext = hashstate->cstate.cs_ExprContext;
/* ----------------
* get all inner tuples and insert into the hash table (or temp files)
* ----------------
*/
for (;;)
{
slot = ExecProcNode(outerNode, (Plan *) node);
if (TupIsNull(slot))
break;
econtext->ecxt_innertuple = slot;
ExecHashTableInsert(hashtable, econtext, hashkey);
ExecClearTuple(slot);
}
/* ---------------------
* Return the slot so that we have the tuple descriptor
* when we need to save/restore them. -Jeff 11 July 1991
* ---------------------
*/
return slot;
}
/* ----------------------------------------------------------------
* ExecInitHash
*
* Init routine for Hash node
* ----------------------------------------------------------------
*/
bool
ExecInitHash(Hash *node, EState *estate, Plan *parent)
{
HashState *hashstate;
Plan *outerPlan;
SO1_printf("ExecInitHash: %s\n",
"initializing hash node");
/* ----------------
* assign the node's execution state
* ----------------
*/
node->plan.state = estate;
/* ----------------
* create state structure
* ----------------
*/
hashstate = makeNode(HashState);
node->hashstate = hashstate;
hashstate->hashtable = NULL;
/* ----------------
* Miscellaneous initialization
*
* + assign node's base_id
* + assign debugging hooks and
* + create expression context for node
* ----------------
*/
ExecAssignNodeBaseInfo(estate, &hashstate->cstate, parent);
ExecAssignExprContext(estate, &hashstate->cstate);
/* ----------------
* initialize our result slot
* ----------------
*/
ExecInitResultTupleSlot(estate, &hashstate->cstate);
/* ----------------
* initializes child nodes
* ----------------
*/
outerPlan = outerPlan(node);
ExecInitNode(outerPlan, estate, (Plan *) node);
/* ----------------
* initialize tuple type. no need to initialize projection
* info because this node doesn't do projections
* ----------------
*/
ExecAssignResultTypeFromOuterPlan((Plan *) node, &hashstate->cstate);
hashstate->cstate.cs_ProjInfo = NULL;
return TRUE;
}
int
ExecCountSlotsHash(Hash *node)
{
#define HASH_NSLOTS 1
return ExecCountSlotsNode(outerPlan(node)) +
ExecCountSlotsNode(innerPlan(node)) +
HASH_NSLOTS;
}
/* ---------------------------------------------------------------
* ExecEndHash
*
* clean up routine for Hash node
* ----------------------------------------------------------------
*/
void
ExecEndHash(Hash *node)
{
HashState *hashstate;
Plan *outerPlan;
/* ----------------
* get info from the hash state
* ----------------
*/
hashstate = node->hashstate;
/* ----------------
* free projection info. no need to free result type info
* because that came from the outer plan...
* ----------------
*/
ExecFreeProjectionInfo(&hashstate->cstate);
/* ----------------
* shut down the subplan
* ----------------
*/
outerPlan = outerPlan(node);
ExecEndNode(outerPlan, (Plan *) node);
}
/* ----------------------------------------------------------------
* ExecHashTableCreate
*
* create a hashtable in shared memory for hashjoin.
* ----------------------------------------------------------------
*/
#define NTUP_PER_BUCKET 10
#define FUDGE_FAC 2.0
HashJoinTable
ExecHashTableCreate(Hash *node)
{
Plan *outerNode;
int ntuples;
int tupsize;
double inner_rel_bytes;
double hash_table_bytes;
int nbatch;
HashJoinTable hashtable;
int nbuckets;
int totalbuckets;
int bucketsize;
int i;
Portal myPortal;
char myPortalName[64];
MemoryContext oldcxt;
/* ----------------
* Get information about the size of the relation to be hashed
* (it's the "outer" subtree of this node, but the inner relation of
* the hashjoin).
* Caution: this is only the planner's estimates, and so
* can't be trusted too far. Apply a healthy fudge factor.
* ----------------
*/
outerNode = outerPlan(node);
ntuples = outerNode->plan_size;
if (ntuples <= 0) /* force a plausible size if no info */
ntuples = 1000;
/*
* estimate tupsize based on footprint of tuple in hashtable... but
* what about palloc overhead?
*/
tupsize = MAXALIGN(outerNode->plan_width) +
MAXALIGN(sizeof(HashJoinTupleData));
inner_rel_bytes = (double) ntuples *tupsize * FUDGE_FAC;
/*
* Target hashtable size is SortMem kilobytes, but not less than
* sqrt(estimated inner rel size), so as to avoid horrible
* performance.
*/
hash_table_bytes = sqrt(inner_rel_bytes);
if (hash_table_bytes < (SortMem * 1024L))
hash_table_bytes = SortMem * 1024L;
/*
* Count the number of hash buckets we want for the whole relation,
* for an average bucket load of NTUP_PER_BUCKET (per virtual
* bucket!).
*/
totalbuckets = (int) ceil((double) ntuples * FUDGE_FAC / NTUP_PER_BUCKET);
/*
* Count the number of buckets we think will actually fit in the
* target memory size, at a loading of NTUP_PER_BUCKET (physical
* buckets). NOTE: FUDGE_FAC here determines the fraction of the
* hashtable space reserved to allow for nonuniform distribution of
* hash values. Perhaps this should be a different number from the
* other uses of FUDGE_FAC, but since we have no real good way to pick
* either one...
*/
bucketsize = NTUP_PER_BUCKET * tupsize;
nbuckets = (int) (hash_table_bytes / (bucketsize * FUDGE_FAC));
if (nbuckets <= 0)
nbuckets = 1;
if (totalbuckets <= nbuckets)
{
/*
* We have enough space, so no batching. In theory we could even
* reduce nbuckets, but since that could lead to poor behavior if
* estimated ntuples is much less than reality, it seems better to
* make more buckets instead of fewer.
*/
totalbuckets = nbuckets;
nbatch = 0;
}
else
{
/*
* Need to batch; compute how many batches we want to use. Note
* that nbatch doesn't have to have anything to do with the ratio
* totalbuckets/nbuckets; in fact, it is the number of groups we
* will use for the part of the data that doesn't fall into the
* first nbuckets hash buckets.
*/
nbatch = (int) ceil((inner_rel_bytes - hash_table_bytes) /
hash_table_bytes);
if (nbatch <= 0)
nbatch = 1;
}
/*
* Now, totalbuckets is the number of (virtual) hashbuckets for the
* whole relation, and nbuckets is the number of physical hashbuckets
* we will use in the first pass. Data falling into the first
* nbuckets virtual hashbuckets gets handled in the first pass;
* everything else gets divided into nbatch batches to be processed in
* additional passes.
*/
#ifdef HJDEBUG
printf("nbatch = %d, totalbuckets = %d, nbuckets = %d\n",
nbatch, totalbuckets, nbuckets);
#endif
/* ----------------
* Initialize the hash table control block.
* The hashtable control block is just palloc'd from executor memory.
* ----------------
*/
hashtable = (HashJoinTable) palloc(sizeof(HashTableData));
hashtable->nbuckets = nbuckets;
hashtable->totalbuckets = totalbuckets;
hashtable->buckets = NULL;
hashtable->nbatch = nbatch;
hashtable->curbatch = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
hashtable->innerBatchSize = NULL;
hashtable->outerBatchSize = NULL;
/* ----------------
* Create a named portal in which to keep the hashtable working storage.
* Each hashjoin must have its own portal, so be wary of name conflicts.
* ----------------
*/
i = 0;
do
{
i++;
sprintf(myPortalName, "<hashtable %d>", i);
myPortal = GetPortalByName(myPortalName);
} while (PortalIsValid(myPortal));
myPortal = CreatePortal(myPortalName);
Assert(PortalIsValid(myPortal));
hashtable->myPortal = (void *) myPortal; /* kluge for circular
* includes */
hashtable->hashCxt = (MemoryContext) PortalGetVariableMemory(myPortal);
hashtable->batchCxt = (MemoryContext) PortalGetHeapMemory(myPortal);
/* Allocate data that will live for the life of the hashjoin */
oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
if (nbatch > 0)
{
/* ---------------
* allocate and initialize the file arrays in hashCxt
* ---------------
*/
hashtable->innerBatchFile = (BufFile **)
palloc(nbatch * sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
palloc(nbatch * sizeof(BufFile *));
hashtable->innerBatchSize = (long *)
palloc(nbatch * sizeof(long));
hashtable->outerBatchSize = (long *)
palloc(nbatch * sizeof(long));
for (i = 0; i < nbatch; i++)
{
hashtable->innerBatchFile[i] = NULL;
hashtable->outerBatchFile[i] = NULL;
hashtable->innerBatchSize[i] = 0;
hashtable->outerBatchSize[i] = 0;
}
/* The files will not be opened until later... */
}
/*
* Prepare portal for the first-scan space allocations; allocate the
* hashbucket array therein, and set each bucket "empty".
*/
MemoryContextSwitchTo(hashtable->batchCxt);
StartPortalAllocMode(DefaultAllocMode, 0);
hashtable->buckets = (HashJoinTuple *)
palloc(nbuckets * sizeof(HashJoinTuple));
if (hashtable->buckets == NULL)
elog(ERROR, "Insufficient memory for hash table.");
for (i = 0; i < nbuckets; i++)
hashtable->buckets[i] = NULL;
MemoryContextSwitchTo(oldcxt);
return hashtable;
}
/* ----------------------------------------------------------------
* ExecHashTableDestroy
*
* destroy a hash table
* ----------------------------------------------------------------
*/
void
ExecHashTableDestroy(HashJoinTable hashtable)
{
int i;
/* Make sure all the temp files are closed */
for (i = 0; i < hashtable->nbatch; i++)
{
if (hashtable->innerBatchFile[i])
BufFileClose(hashtable->innerBatchFile[i]);
if (hashtable->outerBatchFile[i])
BufFileClose(hashtable->outerBatchFile[i]);
}
/* Destroy the portal to release all working memory */
/* cast here is a kluge for circular includes... */
PortalDestroy((Portal *) &hashtable->myPortal);
/* And drop the control block */
pfree(hashtable);
}
/* ----------------------------------------------------------------
* ExecHashTableInsert
*
* insert a tuple into the hash table depending on the hash value
* it may just go to a tmp file for other batches
* ----------------------------------------------------------------
*/
void
ExecHashTableInsert(HashJoinTable hashtable,
ExprContext *econtext,
Var *hashkey)
{
int bucketno = ExecHashGetBucket(hashtable, econtext, hashkey);
TupleTableSlot *slot = econtext->ecxt_innertuple;
HeapTuple heapTuple = slot->val;
/* ----------------
* decide whether to put the tuple in the hash table or a tmp file
* ----------------
*/
if (bucketno < hashtable->nbuckets)
{
/* ---------------
* put the tuple in hash table
* ---------------
*/
HashJoinTuple hashTuple;
int hashTupleSize;
hashTupleSize = MAXALIGN(sizeof(*hashTuple)) + heapTuple->t_len;
hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt,
hashTupleSize);
if (hashTuple == NULL)
elog(ERROR, "Insufficient memory for hash table.");
memcpy((char *) &hashTuple->htup,
(char *) heapTuple,
sizeof(hashTuple->htup));
hashTuple->htup.t_data = (HeapTupleHeader)
(((char *) hashTuple) + MAXALIGN(sizeof(*hashTuple)));
memcpy((char *) hashTuple->htup.t_data,
(char *) heapTuple->t_data,
heapTuple->t_len);
hashTuple->next = hashtable->buckets[bucketno];
hashtable->buckets[bucketno] = hashTuple;
}
else
{
/* -----------------
* put the tuple into a tmp file for other batches
* -----------------
*/
int batchno = (hashtable->nbatch * (bucketno - hashtable->nbuckets)) /
(hashtable->totalbuckets - hashtable->nbuckets);
hashtable->innerBatchSize[batchno]++;
ExecHashJoinSaveTuple(heapTuple,
hashtable->innerBatchFile[batchno]);
}
}
/* ----------------------------------------------------------------
* ExecHashGetBucket
*
* Get the hash value for a tuple
* ----------------------------------------------------------------
*/
int
ExecHashGetBucket(HashJoinTable hashtable,
ExprContext *econtext,
Var *hashkey)
{
int bucketno;
Datum keyval;
bool isNull;
/* ----------------
* Get the join attribute value of the tuple
*
* ...It's quick hack - use ExecEvalExpr instead of ExecEvalVar:
* hashkey may be T_ArrayRef, not just T_Var. - vadim 04/22/97
* ----------------
*/
keyval = ExecEvalExpr((Node *) hashkey, econtext, &isNull, NULL);
/*
* keyval could be null, so we better point it to something valid
* before trying to run hashFunc on it. --djm 8/17/96
*/
if (isNull)
{
execConstByVal = 0;
execConstLen = 0;
keyval = (Datum) "";
}
/* ------------------
* compute the hash function
* ------------------
*/
bucketno = hashFunc(keyval, execConstLen, execConstByVal) % hashtable->totalbuckets;
#ifdef HJDEBUG
if (bucketno >= hashtable->nbuckets)
printf("hash(%d) = %d SAVED\n", keyval, bucketno);
else
printf("hash(%d) = %d\n", keyval, bucketno);
#endif
return bucketno;
}
/* ----------------------------------------------------------------
* ExecScanHashBucket
*
* scan a hash bucket of matches
* ----------------------------------------------------------------
*/
HeapTuple
ExecScanHashBucket(HashJoinState *hjstate,
List *hjclauses,
ExprContext *econtext)
{
HashJoinTable hashtable = hjstate->hj_HashTable;
HashJoinTuple hashTuple = hjstate->hj_CurTuple;
/*
* hj_CurTuple is NULL to start scanning a new bucket, or the address
* of the last tuple returned from the current bucket.
*/
if (hashTuple == NULL)
hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
else
hashTuple = hashTuple->next;
while (hashTuple != NULL)
{
HeapTuple heapTuple = &hashTuple->htup;
TupleTableSlot *inntuple;
bool qualResult;
/* insert hashtable's tuple into exec slot so ExecQual sees it */
inntuple = ExecStoreTuple(heapTuple, /* tuple to store */
hjstate->hj_HashTupleSlot, /* slot */
InvalidBuffer,
false); /* do not pfree this tuple */
econtext->ecxt_innertuple = inntuple;
qualResult = ExecQual(hjclauses, econtext);
if (qualResult)
{
hjstate->hj_CurTuple = hashTuple;
return heapTuple;
}
hashTuple = hashTuple->next;
}
/* ----------------
* no match
* ----------------
*/
return NULL;
}
/* ----------------------------------------------------------------
* hashFunc
*
* the hash function, copied from Margo
* ----------------------------------------------------------------
*/
static int
hashFunc(Datum key, int len, bool byVal)
{
unsigned int h = 0;
unsigned char *k;
if (byVal)
{
/*
* If it's a by-value data type, use the 'len' least significant
* bytes of the Datum value. This should do the right thing on
* either bigendian or littleendian hardware --- see the Datum
* access macros in c.h.
*/
while (len-- > 0)
{
h = (h * PRIME1) ^ (key & 0xFF);
key >>= 8;
}
}
else
{
/*
* If this is a variable length type, then 'k' points to a "struct
* varlena" and len == -1. NOTE: VARSIZE returns the "real" data
* length plus the sizeof the "vl_len" attribute of varlena (the
* length information). 'k' points to the beginning of the varlena
* struct, so we have to use "VARDATA" to find the beginning of
* the "real" data.
*/
if (len == -1)
{
len = VARSIZE(key) - VARHDRSZ;
k = (unsigned char *) VARDATA(key);
}
else
k = (unsigned char *) key;
while (len-- > 0)
h = (h * PRIME1) ^ (*k++);
}
return h % PRIME2;
}
/* ----------------------------------------------------------------
* ExecHashTableReset
*
* reset hash table header for new batch
*
* ntuples is the number of tuples in the inner relation's batch
* (which we currently don't actually use...)
* ----------------------------------------------------------------
*/
void
ExecHashTableReset(HashJoinTable hashtable, long ntuples)
{
MemoryContext oldcxt;
int nbuckets = hashtable->nbuckets;
int i;
/*
* Release all the hash buckets and tuples acquired in the prior pass,
* and reinitialize the portal for a new pass.
*/
oldcxt = MemoryContextSwitchTo(hashtable->batchCxt);
EndPortalAllocMode();
StartPortalAllocMode(DefaultAllocMode, 0);
/*
* We still use the same number of physical buckets as in the first
* pass. (It could be different; but we already decided how many
* buckets would be appropriate for the allowed memory, so stick with
* that number.) We MUST set totalbuckets to equal nbuckets, because
* from now on no tuples will go out to temp files; there are no more
* virtual buckets, only real buckets. (This implies that tuples will
* go into different bucket numbers than they did on the first pass,
* but that's OK.)
*/
hashtable->totalbuckets = nbuckets;
/* Reallocate and reinitialize the hash bucket headers. */
hashtable->buckets = (HashJoinTuple *)
palloc(nbuckets * sizeof(HashJoinTuple));
if (hashtable->buckets == NULL)
elog(ERROR, "Insufficient memory for hash table.");
for (i = 0; i < nbuckets; i++)
hashtable->buckets[i] = NULL;
MemoryContextSwitchTo(oldcxt);
}
void
ExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent)
{
/*
* if chgParam of subnode is not null then plan will be re-scanned by
* first ExecProcNode.
*/
if (((Plan *) node)->lefttree->chgParam == NULL)
ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node);
}