diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 61480733a1..2dad0c8a55 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1627,6 +1627,23 @@ ExecHashTableInsert(HashJoinTable hashtable, { bool shouldFree; MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); + + ExecHashTableInsertTuple(hashtable, tuple, hashvalue); + + if (shouldFree) + heap_free_minimal_tuple(tuple); +} + +/* + * ExecHashTableInsert + * insert a tuple into the hash table depending on the hash value + * it may just go to a temp file for later batches + */ +void +ExecHashTableInsertTuple(HashJoinTable hashtable, + MinimalTuple tuple, + uint32 hashvalue) +{ int bucketno; int batchno; @@ -1701,9 +1718,6 @@ ExecHashTableInsert(HashJoinTable hashtable, &hashtable->innerBatchFile[batchno], hashtable); } - - if (shouldFree) - heap_free_minimal_tuple(tuple); } /* @@ -1777,12 +1791,10 @@ retry: * tuples that belong in the current batch once growth has been disabled. */ void -ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, - TupleTableSlot *slot, - uint32 hashvalue) +ExecParallelHashTableInsertCurrentBatchTuple(HashJoinTable hashtable, + MinimalTuple tuple, + uint32 hashvalue) { - bool shouldFree; - MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); HashJoinTuple hashTuple; dsa_pointer shared; int batchno; @@ -1798,6 +1810,21 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple)); ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], hashTuple, shared); +} + +/* + * like ExecParallelHashTableInsertCurrentBatchTuple, + * but this function accept a TupleTableSlot + */ +void +ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, + TupleTableSlot *slot, + uint32 hashvalue) +{ + bool shouldFree; + MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); + + ExecParallelHashTableInsertCurrentBatchTuple(hashtable, tuple, hashvalue); if (shouldFree) heap_free_minimal_tuple(tuple); diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 5f4073eabd..002098f129 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -194,10 +194,10 @@ static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode, static TupleTableSlot *ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, HashJoinState *hjstate, uint32 *hashvalue); -static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate, - BufFile *file, - uint32 *hashvalue, - TupleTableSlot *tupleSlot); +static MinimalTuple ExecHashJoinGetSavedTuple(HashJoinState *hjstate, + BufFile *file, + uint32 *hashvalue, + StringInfo buf); static bool ExecHashJoinNewBatch(HashJoinState *hjstate); static bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); static void ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate); @@ -831,6 +831,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) */ hjstate->hj_HashTable = NULL; hjstate->hj_FirstOuterTupleSlot = NULL; + hjstate->hj_outerTupleBuffer = NULL; hjstate->hj_CurHashValue = 0; hjstate->hj_CurBucketNo = 0; @@ -936,6 +937,7 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, } else if (curbatch < hashtable->nbatch) { + MinimalTuple mtup; BufFile *file = hashtable->outerBatchFile[curbatch]; /* @@ -945,12 +947,23 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, if (file == NULL) return NULL; - slot = ExecHashJoinGetSavedTuple(hjstate, + if (unlikely(hjstate->hj_outerTupleBuffer == NULL)) + { + MemoryContext oldcontext = MemoryContextSwitchTo(GetMemoryChunkContext(hjstate)); + hjstate->hj_outerTupleBuffer = makeStringInfo(); + MemoryContextSwitchTo(oldcontext); + } + + mtup = ExecHashJoinGetSavedTuple(hjstate, file, hashvalue, - hjstate->hj_OuterTupleSlot); - if (!TupIsNull(slot)) + hjstate->hj_outerTupleBuffer); + if (likely(mtup != NULL)) + { + slot = hjstate->hj_OuterTupleSlot; + ExecForceStoreMinimalTuple(mtup, slot, false); return slot; + } } /* End of this batch */ @@ -1034,7 +1047,6 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) int nbatch; int curbatch; BufFile *innerFile; - TupleTableSlot *slot; uint32 hashvalue; nbatch = hashtable->nbatch; @@ -1125,21 +1137,25 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) if (innerFile != NULL) { + StringInfoData buf; + MinimalTuple tuple; + if (BufFileSeek(innerFile, 0, 0, SEEK_SET)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not rewind hash-join temporary file"))); - while ((slot = ExecHashJoinGetSavedTuple(hjstate, - innerFile, - &hashvalue, - hjstate->hj_HashTupleSlot))) + initStringInfo(&buf); + while ((tuple = ExecHashJoinGetSavedTuple(hjstate, + innerFile, + &hashvalue, + &buf))) { /* * NOTE: some tuples may be sent to future batches. Also, it is * possible for hashtable->nbatch to be increased here! */ - ExecHashTableInsert(hashtable, slot, hashvalue); + ExecHashTableInsertTuple(hashtable, tuple, hashvalue); } /* @@ -1148,6 +1164,7 @@ ExecHashJoinNewBatch(HashJoinState *hjstate) */ BufFileClose(innerFile); hashtable->innerBatchFile[curbatch] = NULL; + pfree(buf.data); } /* @@ -1198,7 +1215,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) { uint32 hashvalue; MinimalTuple tuple; - TupleTableSlot *slot; if (!hashtable->batches[batchno].done) { @@ -1230,12 +1246,9 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) while ((tuple = sts_parallel_scan_next(inner_tuples, &hashvalue))) { - ExecForceStoreMinimalTuple(tuple, - hjstate->hj_HashTupleSlot, - false); - slot = hjstate->hj_HashTupleSlot; - ExecParallelHashTableInsertCurrentBatch(hashtable, slot, - hashvalue); + ExecParallelHashTableInsertCurrentBatchTuple(hashtable, + tuple, + hashvalue); } sts_end_parallel_scan(inner_tuples); BarrierArriveAndWait(batch_barrier, @@ -1349,14 +1362,14 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, * ExecHashJoinGetSavedTuple * read the next tuple from a batch file. Return NULL if no more. * - * On success, *hashvalue is set to the tuple's hash value, and the tuple - * itself is stored in the given slot. + * On success, *hashvalue is set to the tuple's hash value, and return + * the tuple(stored in the given buf) itself. */ -static TupleTableSlot * +static MinimalTuple ExecHashJoinGetSavedTuple(HashJoinState *hjstate, BufFile *file, uint32 *hashvalue, - TupleTableSlot *tupleSlot) + StringInfo buf) { uint32 header[2]; size_t nread; @@ -1375,19 +1388,19 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate, * cheating. */ nread = BufFileReadMaybeEOF(file, header, sizeof(header), true); - if (nread == 0) /* end of file */ - { - ExecClearTuple(tupleSlot); + if (unlikely(nread == 0)) /* end of file */ return NULL; - } + + enlargeStringInfo(buf, header[1]); *hashvalue = header[0]; - tuple = (MinimalTuple) palloc(header[1]); + buf->len = header[1]; + tuple = (MinimalTuple) buf->data; tuple->t_len = header[1]; BufFileReadExact(file, (char *) tuple + sizeof(uint32), header[1] - sizeof(uint32)); - ExecForceStoreMinimalTuple(tuple, tupleSlot, true); - return tupleSlot; + + return tuple; } diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index a95911c2fe..543f64cdf7 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -37,12 +37,18 @@ extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, extern void ExecHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue); +extern void ExecHashTableInsertTuple(HashJoinTable hashtable, + MinimalTuple tuple, + uint32 hashvalue); extern void ExecParallelHashTableInsert(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue); extern void ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, TupleTableSlot *slot, uint32 hashvalue); +extern void ExecParallelHashTableInsertCurrentBatchTuple(HashJoinTable hashtable, + MinimalTuple tuple, + uint32 hashvalue); extern bool ExecHashGetHashValue(HashJoinTable hashtable, ExprContext *econtext, List *hashkeys, diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index b62c96f206..e5e3a0a155 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2200,6 +2200,7 @@ typedef struct HashJoinState TupleTableSlot *hj_NullOuterTupleSlot; TupleTableSlot *hj_NullInnerTupleSlot; TupleTableSlot *hj_FirstOuterTupleSlot; + StringInfo hj_outerTupleBuffer; int hj_JoinState; bool hj_MatchedOuter; bool hj_OuterNotEmpty;