From c9936b1a2460c3b3cf3a42cf1ef51b4d018c6c07 Mon Sep 17 00:00:00 2001
From: Arseny Sher <sher-ars@ispras.ru>
Date: Sat, 11 Mar 2017 00:36:31 +0300
Subject: [PATCH 5/8] Reversed HashJoin implementation.

The main point is that tuples are pushed immediately after the match, e.g. we
scan the whole bucket in one loop and pushTuple each match.
---
 src/backend/executor/execProcnode.c |  39 +++
 src/backend/executor/nodeHash.c     | 242 +++++++++++++++++-
 src/backend/executor/nodeHashjoin.c | 479 +++++++++++++-----------------------
 src/include/executor/nodeHash.h     |   9 +-
 src/include/executor/nodeHashjoin.h | 100 +++++++-
 src/include/nodes/execnodes.h       |   2 +
 6 files changed, 547 insertions(+), 324 deletions(-)

diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index b0468667bb..88e14d144a 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -156,6 +156,22 @@ ExecInitNode(Plan *node, EState *estate, int eflags, PlanState *parent)
 			result = (PlanState *) ExecInitSeqScan((SeqScan *) node,
 												   estate, eflags, parent);
 			break;
+
+		/*
+		 * join nodes
+		 */
+		case T_HashJoin:
+			result = (PlanState *) ExecInitHashJoin((HashJoin *) node,
+													estate, eflags, parent);
+			break;
+
+		/*
+		 * materialization nodes
+		 */
+		case T_Hash:
+			result = (PlanState *) ExecInitHash((Hash *) node,
+												estate, eflags, parent);
+			break;
 		default:
 			elog(ERROR, "unrecognized/unsupported node type: %d",
 				 (int) nodeTag(node));
@@ -231,6 +247,15 @@ pushTuple(TupleTableSlot *slot, PlanState *node, PlanState *pusher)
 	/* does push come from the outer side? */
 	push_from_outer = outerPlanState(node) == pusher;
 
+	if (nodeTag(node) == T_HashState)
+		return pushTupleToHash(slot, (HashState *) node);
+
+	else if (nodeTag(node) == T_HashJoinState && push_from_outer)
+		return pushTupleToHashJoinFromOuter(slot, (HashJoinState *) node);
+
+	else if (nodeTag(node) == T_HashJoinState && !push_from_outer)
+		return pushTupleToHashJoinFromInner(slot, (HashJoinState *) node);
+
 	elog(ERROR, "node type not supported: %d", (int) nodeTag(node));
 }
 
@@ -280,6 +305,20 @@ ExecEndNode(PlanState *node)
 			ExecEndSeqScan((SeqScanState *) node);
 			break;
 
+		/*
+		 * join nodes
+		 */
+		case T_HashJoinState:
+			ExecEndHashJoin((HashJoinState *) node);
+			break;
+
+		/*
+		 * materialization nodes
+		 */
+		case T_HashState:
+			ExecEndHash((HashState *) node);
+			break;
+
 		default:
 			elog(ERROR, "unrecognized/unsupported node type: %d",
 				 (int) nodeTag(node));
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 43e65ca04e..06fe45f29b 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -50,17 +50,95 @@ static void ExecHashRemoveNextSkewBucket(HashJoinTable hashtable);
 
 static void *dense_alloc(HashJoinTable hashtable, Size size);
 
-/* ----------------------------------------------------------------
- *		ExecHash
- *
- *		stub for pro forma compliance
- * ----------------------------------------------------------------
+
+/* Put incoming tuples to the hastable; when NULL received, finalize building
+ * hashatable and notify HashJoin about that.
  */
-TupleTableSlot *
-ExecHash(HashState *node)
+bool
+pushTupleToHash(TupleTableSlot *slot, HashState *node)
 {
-	elog(ERROR, "Hash node does not support ExecProcNode call convention");
-	return NULL;
+	List	   *hashkeys;
+	HashJoinTable hashtable;
+	ExprContext *econtext;
+	uint32		hashvalue;
+	HashJoinState *hj_node;
+
+	hj_node = (HashJoinState *) node->ps.parent;
+
+	/* Create the hastable. In vanilla Postgres this code is in HashJoin */
+	if (node->first_time_through)
+	{
+		Assert(node->hashtable == NULL);
+
+		node->hashtable = ExecHashTableCreate((Hash *) node->ps.plan,
+											  hj_node->hj_HashOperators,
+											  HJ_FILL_INNER(hj_node));
+
+		/* must provide our own instrumentation support */
+		if (node->ps.instrument)
+			InstrStartNode(node->ps.instrument);
+
+		node->first_time_through = false;
+	}
+
+	/*
+	 * get state info from node
+	 */
+	hashtable = node->hashtable;
+
+	/*
+	 * set expression context
+	 */
+	hashkeys = node->hashkeys;
+	econtext = node->ps.ps_ExprContext;
+
+	/* NULL tuple received; let HashJoin know that the hashtable is built
+	   and exit */
+	if (TupIsNull(slot))
+	{
+		/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
+		if (hashtable->nbuckets != hashtable->nbuckets_optimal)
+			ExecHashIncreaseNumBuckets(hashtable);
+
+		/* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+		hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+		if (hashtable->spaceUsed > hashtable->spacePeak)
+			hashtable->spacePeak = hashtable->spaceUsed;
+
+		/* must provide our own instrumentation support */
+		if (node->ps.instrument)
+			InstrStopNode(node->ps.instrument, hashtable->totalTuples);
+
+		pushTuple(NULL, (PlanState *) node->ps.parent, (PlanState *) node);
+		return false;
+	}
+
+	/* We have to compute the hash value */
+	econtext->ecxt_innertuple = slot;
+	if (ExecHashGetHashValue(hashtable, econtext, hashkeys,
+							 false, hashtable->keepNulls,
+							 &hashvalue))
+	{
+		int			bucketNumber;
+
+		bucketNumber = ExecHashGetSkewBucket(hashtable, hashvalue);
+		if (bucketNumber != INVALID_SKEW_BUCKET_NO)
+		{
+			/* It's a skew tuple, so put it into that hash table */
+			ExecHashSkewTableInsert(hashtable, slot, hashvalue,
+									bucketNumber);
+			hashtable->skewTuples += 1;
+		}
+		else
+		{
+			/* Not subject to skew optimization, so insert normally */
+			ExecHashTableInsert(hashtable, slot, hashvalue);
+		}
+		hashtable->totalTuples += 1;
+	}
+
+	/* ready to accept another tuple */
+	return true;
 }
 
 /* ----------------------------------------------------------------
@@ -159,7 +237,7 @@ MultiExecHash(HashState *node)
  * ----------------------------------------------------------------
  */
 HashState *
-ExecInitHash(Hash *node, EState *estate, int eflags)
+ExecInitHash(Hash *node, EState *estate, int eflags, PlanState *parent)
 {
 	HashState  *hashstate;
 
@@ -172,8 +250,10 @@ ExecInitHash(Hash *node, EState *estate, int eflags)
 	hashstate = makeNode(HashState);
 	hashstate->ps.plan = (Plan *) node;
 	hashstate->ps.state = estate;
+	hashstate->ps.parent = parent;
 	hashstate->hashtable = NULL;
 	hashstate->hashkeys = NIL;	/* will be set by parent HashJoin */
+	hashstate->first_time_through = true;
 
 	/*
 	 * Miscellaneous initialization
@@ -201,7 +281,7 @@ ExecInitHash(Hash *node, EState *estate, int eflags)
 	 * initialize child nodes
 	 */
 	outerPlanState(hashstate) = ExecInitNode(outerPlan(node), estate, eflags,
-											 (PlanState*) hashstate);
+											 (PlanState *) hashstate);
 
 	/*
 	 * initialize tuple type. no need to initialize projection info because
@@ -1115,6 +1195,68 @@ ExecScanHashBucket(HashJoinState *hjstate,
 }
 
 /*
+ * ExecScanHashBucket
+ *		scan a hash bucket for matches to the current outer tuple and push
+ *		them
+ *
+ * The current outer tuple must be stored in econtext->ecxt_outertuple.
+ *
+ * Returns true, if parent still accepts tuples, false otherwise.
+ */
+bool
+ExecScanHashBucketAndPush(HashJoinState *hjstate,
+						  ExprContext *econtext)
+{
+	List	   *hjclauses = hjstate->hashclauses;
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	HashJoinTuple hashTuple;
+	uint32		hashvalue = hjstate->hj_CurHashValue;
+	bool parent_accepts_tuples = true;
+
+	/*
+	 * For now, we don't support pausing execution; we either push all matching
+	 * tuples from the bucket at once or don't touch it at all.
+	 */
+	Assert(hjstate->hj_CurTuple == NULL);
+
+	/*
+	 * If the tuple hashed to a skew bucket then scan the skew bucket
+	 * otherwise scan the standard hashtable bucket.
+	 */
+	if (hjstate->hj_CurSkewBucketNo != INVALID_SKEW_BUCKET_NO)
+		hashTuple = hashtable->skewBucket[hjstate->hj_CurSkewBucketNo]->tuples;
+	else
+		hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
+
+	while (hashTuple != NULL)
+	{
+		if (hashTuple->hashvalue == hashvalue)
+		{
+			TupleTableSlot *inntuple;
+
+			/* insert hashtable's tuple into exec slot so ExecQual sees it */
+			inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+											 hjstate->hj_HashTupleSlot,
+											 false);	/* do not pfree */
+			econtext->ecxt_innertuple = inntuple;
+
+			/* reset temp memory each time to avoid leaks from qual expr */
+			ResetExprContext(econtext);
+
+			if (ExecQual(hjclauses, econtext, false))
+			{
+				hjstate->hj_CurTuple = hashTuple;
+				parent_accepts_tuples = CheckJoinQualAndPush(hjstate);
+			}
+		}
+
+		hashTuple = hashTuple->next;
+	}
+
+	return parent_accepts_tuples;
+}
+
+/*
  * ExecPrepHashTableForUnmatched
  *		set up for a series of ExecScanHashTableForUnmatched calls
  */
@@ -1206,6 +1348,84 @@ ExecScanHashTableForUnmatched(HashJoinState *hjstate, ExprContext *econtext)
 }
 
 /*
+ * ExecScanHashTableForUnmatchedAndPush
+ *		scan the hash table for unmatched inner tuples and push them
+ *
+ * Like ExecScanHashTableForUnmatched, but pushes all tuples immediately.
+ * Returns true, if parent still accepts tuples, false otherwise
+ */
+bool
+ExecScanHashTableForUnmatchedAndPush(HashJoinState *hjstate,
+									 ExprContext *econtext)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	HashJoinTuple hashTuple = NULL;
+	bool parent_accepts_tuples = true;
+
+	/*
+	 * For now, we don't support pausing execution and don't enter here twice
+	 */
+	Assert(hjstate->hj_CurTuple == NULL);
+
+	for (;;)
+	{
+		/*
+		 * hj_CurTuple is the address of the tuple last returned from the
+		 * current bucket, or NULL if it's time to start scanning a new
+		 * bucket.
+		 */
+		if (hashTuple != NULL)
+			hashTuple = hashTuple->next;
+		else if (hjstate->hj_CurBucketNo < hashtable->nbuckets)
+		{
+			hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo];
+			hjstate->hj_CurBucketNo++;
+		}
+		else if (hjstate->hj_CurSkewBucketNo < hashtable->nSkewBuckets)
+		{
+			int			j = hashtable->skewBucketNums[hjstate->hj_CurSkewBucketNo];
+
+			hashTuple = hashtable->skewBucket[j]->tuples;
+			hjstate->hj_CurSkewBucketNo++;
+		}
+		else
+			break;				/* finished all buckets */
+
+		while (hashTuple != NULL)
+		{
+			if (!HeapTupleHeaderHasMatch(HJTUPLE_MINTUPLE(hashTuple)))
+			{
+				TupleTableSlot *inntuple;
+
+				/* insert hashtable's tuple into exec slot */
+				inntuple = ExecStoreMinimalTuple(HJTUPLE_MINTUPLE(hashTuple),
+												 hjstate->hj_HashTupleSlot,
+												 false);		/* do not pfree */
+				econtext->ecxt_innertuple = inntuple;
+
+				/*
+				 * Reset temp memory each time; although this function doesn't
+				 * do any qual eval, the caller will, so let's keep it
+				 * parallel to ExecScanHashBucket.
+				 */
+				ResetExprContext(econtext);
+
+				/*
+				 * Since right now we don't support pausing execution anyway,
+				 * it is probably unnecessary.
+				 */
+				hjstate->hj_CurTuple = hashTuple;
+				parent_accepts_tuples = PushUnmatched(hjstate);
+			}
+
+			hashTuple = hashTuple->next;
+		}
+	}
+
+	return parent_accepts_tuples;
+}
+
+/*
  * ExecHashTableReset
  *
  *		reset hash table header for new batch
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index b48863f90b..6c637548e1 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -27,172 +27,149 @@
 /*
  * States of the ExecHashJoin state machine
  */
-#define HJ_BUILD_HASHTABLE		1
-#define HJ_NEED_NEW_OUTER		2
-#define HJ_SCAN_BUCKET			3
-#define HJ_FILL_OUTER_TUPLE		4
-#define HJ_FILL_INNER_TUPLES	5
-#define HJ_NEED_NEW_BATCH		6
-
-/* Returns true if doing null-fill on outer relation */
-#define HJ_FILL_OUTER(hjstate)	((hjstate)->hj_NullInnerTupleSlot != NULL)
-/* Returns true if doing null-fill on inner relation */
-#define HJ_FILL_INNER(hjstate)	((hjstate)->hj_NullOuterTupleSlot != NULL)
-
-static TupleTableSlot *ExecHashJoinOuterGetTuple(PlanState *outerNode,
-						  HashJoinState *hjstate,
-						  uint32 *hashvalue);
-static TupleTableSlot *ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
-						  BufFile *file,
-						  uint32 *hashvalue,
-						  TupleTableSlot *tupleSlot);
+#define HJ_BUILD_HASHTABLE				1
+#define HJ_NEED_NEW_OUTER				2
+#define HJ_SCAN_BUCKET					3
+#define HJ_FILL_OUTER_TUPLE				4
+#define HJ_FILL_INNER_TUPLES			5
+#define HJ_NEED_NEW_BATCH				6
+#define HJ_WAITING_FOR_NEW_OUTER		7
+#define HJ_HANDLE_NEW_OUTER				8
+#define HJ_TAKE_OUTER_FROM_TEMP_FILE	9
+
+static TupleTableSlot *ExecHashJoinGetSavedTuple(BufFile *file,
+												 uint32 *hashvalue,
+												 TupleTableSlot *tupleSlot);
 static bool ExecHashJoinNewBatch(HashJoinState *hjstate);
+static TupleTableSlot *TakeOuterFromTempFile(HashJoinState *hjstate,
+											 uint32 *hashvalue);
 
 
-/* ----------------------------------------------------------------
- *		ExecHashJoin
- *
- *		This function implements the Hybrid Hashjoin algorithm.
- *
- *		Note: the relation we build hash table on is the "inner"
- *			  the other one is "outer".
- * ----------------------------------------------------------------
+
+/*
+ * This function will be called from Hash node with NULL slot, signaling
+ * that the hashtable is built.
+ * "Extract-one-outer-tuple-to-check-if-it-is-null-before-building-hashtable"
+ * optimization is not implemented for now, the hashtable will be always built
+ * first.
+ */
+bool
+pushTupleToHashJoinFromInner(TupleTableSlot *slot, HashJoinState *node)
+{
+	HashJoinTable hashtable;
+	HashState *hashNode;
+
+	hashNode = (HashState *) innerPlanState(node);
+
+	/* we should get there only once */
+	Assert(node->hj_JoinState == HJ_BUILD_HASHTABLE);
+	/* we will fish out the tuples from Hash node ourselves */
+	Assert(TupIsNull(slot));
+
+	/* we always build the hashtable first */
+	node->hj_FirstOuterTupleSlot = NULL;
+
+	hashtable = hashNode->hashtable;
+	node->hj_HashTable = hashtable;
+
+	/*
+	 * need to remember whether nbatch has increased since we
+	 * began scanning the outer relation
+	 */
+	hashtable->nbatch_outstart = hashtable->nbatch;
+
+	/*
+	 * Reset OuterNotEmpty for scan.
+	 */
+	node->hj_OuterNotEmpty = false;
+
+	node->hj_JoinState = HJ_WAITING_FOR_NEW_OUTER;
+
+	/* Don't send us anything on the inner side */
+	return false;
+}
+
+/*
+ * Push from the outer side. Find matches and send them upward to HashJoin's
+ * parent. Return true if this parent ready to accept yet another tuple, false
+ * otherwise. When this function is called, the hashtable must already
+ * be filled.
  */
-TupleTableSlot *				/* return: a tuple or NULL */
-ExecHashJoin(HashJoinState *node)
+bool
+pushTupleToHashJoinFromOuter(TupleTableSlot *slot, HashJoinState *node)
 {
-	PlanState  *outerNode;
-	HashState  *hashNode;
-	List	   *joinqual;
-	List	   *otherqual;
 	ExprContext *econtext;
 	HashJoinTable hashtable;
-	TupleTableSlot *outerTupleSlot;
 	uint32		hashvalue;
 	int			batchno;
 
 	/*
 	 * get information from HashJoin node
 	 */
-	joinqual = node->js.joinqual;
-	otherqual = node->js.ps.qual;
-	hashNode = (HashState *) innerPlanState(node);
-	outerNode = outerPlanState(node);
-	hashtable = node->hj_HashTable;
 	econtext = node->js.ps.ps_ExprContext;
+	hashtable = node->hj_HashTable;
 
-	/*
-	 * Reset per-tuple memory context to free any expression evaluation
-	 * storage allocated in the previous tuple cycle.
-	 */
-	ResetExprContext(econtext);
+	/* We must always be in this state when the tuple is pushed */
+	Assert(node->hj_JoinState == HJ_WAITING_FOR_NEW_OUTER);
 
-	/*
-	 * run the hash join state machine
-	 */
-	for (;;)
+	if (!TupIsNull(slot))
 	{
-		switch (node->hj_JoinState)
+		/*
+		 * We have to compute the tuple's hash value.
+		 */
+		econtext->ecxt_outertuple = slot;
+		if (!ExecHashGetHashValue(hashtable, econtext,
+								  node->hj_OuterHashKeys,
+								  true,		/* outer tuple */
+								  HJ_FILL_OUTER(node),
+								  &hashvalue))
 		{
-			case HJ_BUILD_HASHTABLE:
+			/*
+			 * That tuple couldn't match because of a NULL, so discard it and
+			 * wait for the next one.
+			 */
+			return true;
+		}
+	}
 
-				/*
-				 * First time through: build hash table for inner relation.
-				 */
-				Assert(hashtable == NULL);
+	/* ready to handle this slot */
+	node->hj_JoinState = HJ_HANDLE_NEW_OUTER;
 
-				/*
-				 * If the outer relation is completely empty, and it's not
-				 * right/full join, we can quit without building the hash
-				 * table.  However, for an inner join it is only a win to
-				 * check this when the outer relation's startup cost is less
-				 * than the projected cost of building the hash table.
-				 * Otherwise it's best to build the hash table first and see
-				 * if the inner relation is empty.  (When it's a left join, we
-				 * should always make this check, since we aren't going to be
-				 * able to skip the join on the strength of an empty inner
-				 * relation anyway.)
-				 *
-				 * If we are rescanning the join, we make use of information
-				 * gained on the previous scan: don't bother to try the
-				 * prefetch if the previous scan found the outer relation
-				 * nonempty. This is not 100% reliable since with new
-				 * parameters the outer relation might yield different
-				 * results, but it's a good heuristic.
-				 *
-				 * The only way to make the check is to try to fetch a tuple
-				 * from the outer plan node.  If we succeed, we have to stash
-				 * it away for later consumption by ExecHashJoinOuterGetTuple.
-				 */
-				if (HJ_FILL_INNER(node))
-				{
-					/* no chance to not build the hash table */
-					node->hj_FirstOuterTupleSlot = NULL;
-				}
-				else if (HJ_FILL_OUTER(node) ||
-						 (outerNode->plan->startup_cost < hashNode->ps.plan->total_cost &&
-						  !node->hj_OuterNotEmpty))
+	/* Push tuples matching to the received outer tuple while we can */
+	for (;;)
+	{
+		switch(node->hj_JoinState)
+		{
+			case HJ_NEED_NEW_OUTER:
+				if (hashtable->curbatch == 0)
 				{
-					node->hj_FirstOuterTupleSlot = ExecProcNode(outerNode);
-					if (TupIsNull(node->hj_FirstOuterTupleSlot))
-					{
-						node->hj_OuterNotEmpty = false;
-						return NULL;
-					}
-					else
-						node->hj_OuterNotEmpty = true;
+					/*
+					 * On the first batch, we always fetch tuples from below
+					 * nodes, not from temp files. So, setting the state to
+					 * waiting for new outer and telling the node below that
+					 * we are ready to accept the tuple
+					 */
+					node->hj_JoinState = HJ_WAITING_FOR_NEW_OUTER;
+					return true;
 				}
-				else
-					node->hj_FirstOuterTupleSlot = NULL;
-
-				/*
-				 * create the hash table
-				 */
-				hashtable = ExecHashTableCreate((Hash *) hashNode->ps.plan,
-												node->hj_HashOperators,
-												HJ_FILL_INNER(node));
-				node->hj_HashTable = hashtable;
-
-				/*
-				 * execute the Hash node, to build the hash table
-				 */
-				hashNode->hashtable = hashtable;
-				(void) MultiExecProcNode((PlanState *) hashNode);
-
-				/*
-				 * If the inner relation is completely empty, and we're not
-				 * doing a left outer join, we can quit without scanning the
-				 * outer relation.
-				 */
-				if (hashtable->totalTuples == 0 && !HJ_FILL_OUTER(node))
-					return NULL;
-
 				/*
-				 * need to remember whether nbatch has increased since we
-				 * began scanning the outer relation
+				 * on subsequent batches, we always take tuples from temp
+				 * files
 				 */
-				hashtable->nbatch_outstart = hashtable->nbatch;
-
-				/*
-				 * Reset OuterNotEmpty for scan.  (It's OK if we fetched a
-				 * tuple above, because ExecHashJoinOuterGetTuple will
-				 * immediately set it again.)
-				 */
-				node->hj_OuterNotEmpty = false;
-
-				node->hj_JoinState = HJ_NEED_NEW_OUTER;
+				slot = TakeOuterFromTempFile(node, &hashvalue);
+				/* ready to hande this slot */
+				node->hj_JoinState = HJ_HANDLE_NEW_OUTER;
 
 				/* FALL THRU */
 
-			case HJ_NEED_NEW_OUTER:
-
-				/*
-				 * We don't have an outer tuple, try to get the next one
+			case HJ_HANDLE_NEW_OUTER:
+				/* Handle new outer tuple, either from temp files or nodes
+				 * below. It can be NULL, which means the end of batch.
+				 * hashvalue must be set at this moment, and the tuple must
+				 * be in 'slot' variable.
 				 */
-				outerTupleSlot = ExecHashJoinOuterGetTuple(outerNode,
-														   node,
-														   &hashvalue);
-				if (TupIsNull(outerTupleSlot))
+
+				if (TupIsNull(slot))
 				{
 					/* end of batch, or maybe whole join */
 					if (HJ_FILL_INNER(node))
@@ -206,7 +183,7 @@ ExecHashJoin(HashJoinState *node)
 					continue;
 				}
 
-				econtext->ecxt_outertuple = outerTupleSlot;
+				econtext->ecxt_outertuple = slot;
 				node->hj_MatchedOuter = false;
 
 				/*
@@ -232,14 +209,18 @@ ExecHashJoin(HashJoinState *node)
 					 * Save it in the corresponding outer-batch file.
 					 */
 					Assert(batchno > hashtable->curbatch);
-					ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
+					ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(slot),
 										  hashvalue,
-										&hashtable->outerBatchFile[batchno]);
-					/* Loop around, staying in HJ_NEED_NEW_OUTER state */
-					continue;
+										  &hashtable->outerBatchFile[batchno]);
+					/* In fact, this can only happen while we are processing
+					 * the first batch, so we just wait for the new outer
+					 * tuple
+					 */
+					node->hj_JoinState = HJ_WAITING_FOR_NEW_OUTER;
+					return true;
 				}
 
-				/* OK, let's scan the bucket for matches */
+				/* OK, let's scan this bucket for matches with this tuple */
 				node->hj_JoinState = HJ_SCAN_BUCKET;
 
 				/* FALL THRU */
@@ -254,55 +235,14 @@ ExecHashJoin(HashJoinState *node)
 				CHECK_FOR_INTERRUPTS();
 
 				/*
-				 * Scan the selected hash bucket for matches to current outer
+				 * Push all matching tuples from selected hash bucket
 				 */
-				if (!ExecScanHashBucket(node, econtext))
-				{
-					/* out of matches; check for possible outer-join fill */
-					node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
-					continue;
-				}
+				if (!ExecScanHashBucketAndPush(node, econtext))
+					return false;
 
-				/*
-				 * We've got a match, but still need to test non-hashed quals.
-				 * ExecScanHashBucket already set up all the state needed to
-				 * call ExecQual.
-				 *
-				 * If we pass the qual, then save state for next call and have
-				 * ExecProject form the projection, store it in the tuple
-				 * table, and return the slot.
-				 *
-				 * Only the joinquals determine tuple match status, but all
-				 * quals must pass to actually return the tuple.
-				 */
-				if (joinqual == NIL || ExecQual(joinqual, econtext, false))
-				{
-					node->hj_MatchedOuter = true;
-					HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
+				node->hj_JoinState = HJ_FILL_OUTER_TUPLE;
 
-					/* In an antijoin, we never return a matched tuple */
-					if (node->js.jointype == JOIN_ANTI)
-					{
-						node->hj_JoinState = HJ_NEED_NEW_OUTER;
-						continue;
-					}
-
-					/*
-					 * In a semijoin, we'll consider returning the first
-					 * match, but after that we're done with this outer tuple.
-					 */
-					if (node->js.jointype == JOIN_SEMI)
-						node->hj_JoinState = HJ_NEED_NEW_OUTER;
-
-					if (otherqual == NIL ||
-						ExecQual(otherqual, econtext, false))
-						return ExecProject(node->js.ps.ps_ProjInfo);
-					else
-						InstrCountFiltered2(node, 1);
-				}
-				else
-					InstrCountFiltered1(node, 1);
-				break;
+				/* FALL THRU */
 
 			case HJ_FILL_OUTER_TUPLE:
 
@@ -313,20 +253,16 @@ ExecHashJoin(HashJoinState *node)
 				 */
 				node->hj_JoinState = HJ_NEED_NEW_OUTER;
 
-				if (!node->hj_MatchedOuter &&
-					HJ_FILL_OUTER(node))
+				if (!node->hj_MatchedOuter && HJ_FILL_OUTER(node))
 				{
 					/*
 					 * Generate a fake join tuple with nulls for the inner
-					 * tuple, and return it if it passes the non-join quals.
+					 * tuple, and push it if it passes the non-join quals.
 					 */
 					econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot;
 
-					if (otherqual == NIL ||
-						ExecQual(otherqual, econtext, false))
-						return ExecProject(node->js.ps.ps_ProjInfo);
-					else
-						InstrCountFiltered2(node, 1);
+					if (!CheckOtherQualAndPush(node))
+						return false;
 				}
 				break;
 
@@ -337,24 +273,10 @@ ExecHashJoin(HashJoinState *node)
 				 * so any unmatched inner tuples in the hashtable have to be
 				 * emitted before we continue to the next batch.
 				 */
-				if (!ExecScanHashTableForUnmatched(node, econtext))
-				{
-					/* no more unmatched tuples */
-					node->hj_JoinState = HJ_NEED_NEW_BATCH;
-					continue;
-				}
-
-				/*
-				 * Generate a fake join tuple with nulls for the outer tuple,
-				 * and return it if it passes the non-join quals.
-				 */
-				econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
+				if (!ExecScanHashTableForUnmatchedAndPush(node, econtext))
+					return false;
 
-				if (otherqual == NIL ||
-					ExecQual(otherqual, econtext, false))
-					return ExecProject(node->js.ps.ps_ProjInfo);
-				else
-					InstrCountFiltered2(node, 1);
+				node->hj_JoinState = HJ_NEED_NEW_BATCH;
 				break;
 
 			case HJ_NEED_NEW_BATCH:
@@ -363,17 +285,43 @@ ExecHashJoin(HashJoinState *node)
 				 * Try to advance to next batch.  Done if there are no more.
 				 */
 				if (!ExecHashJoinNewBatch(node))
-					return NULL;	/* end of join */
+				{
+					/* let parent know that we are done */
+					pushTuple(NULL, node->js.ps.parent, (PlanState *) node);
+					return false;	/* end of join */
+				}
 				node->hj_JoinState = HJ_NEED_NEW_OUTER;
 				break;
 
-			default:
-				elog(ERROR, "unrecognized hashjoin state: %d",
-					 (int) node->hj_JoinState);
 		}
 	}
 }
 
+/*
+ * Get next outer tuple from saved temp files. We are processing not the first
+ * batch if we are here. On success, the tuple's hash value is stored at
+ * *hashvalue, re-read from the temp file.
+ * Returns NULL on the end of batch, a tuple otherwise.
+ */
+static TupleTableSlot *TakeOuterFromTempFile(HashJoinState *hjstate,
+											 uint32 *hashvalue)
+{
+	HashJoinTable hashtable = hjstate->hj_HashTable;
+	int			curbatch = hashtable->curbatch;
+	BufFile    *file = hashtable->outerBatchFile[curbatch];
+
+	/*
+	 * In outer-join cases, we could get here even though the batch file
+	 * is empty.
+	 */
+	if (file == NULL)
+		return NULL;
+
+	return ExecHashJoinGetSavedTuple(file,
+									 hashvalue,
+									 hjstate->hj_OuterTupleSlot);
+}
+
 /* ----------------------------------------------------------------
  *		ExecInitHashJoin
  *
@@ -381,7 +329,7 @@ ExecHashJoin(HashJoinState *node)
  * ----------------------------------------------------------------
  */
 HashJoinState *
-ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
+ExecInitHashJoin(HashJoin *node, EState *estate, int eflags, PlanState *parent)
 {
 	HashJoinState *hjstate;
 	Plan	   *outerNode;
@@ -400,6 +348,7 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags)
 	hjstate = makeNode(HashJoinState);
 	hjstate->js.ps.plan = (Plan *) node;
 	hjstate->js.ps.state = estate;
+	hjstate->js.ps.parent = parent;
 
 	/*
 	 * Miscellaneous initialization
@@ -579,89 +528,6 @@ ExecEndHashJoin(HashJoinState *node)
 }
 
 /*
- * ExecHashJoinOuterGetTuple
- *
- *		get the next outer tuple for hashjoin: either by
- *		executing the outer plan node in the first pass, or from
- *		the temp files for the hashjoin batches.
- *
- * Returns a null slot if no more outer tuples (within the current batch).
- *
- * On success, the tuple's hash value is stored at *hashvalue --- this is
- * either originally computed, or re-read from the temp file.
- */
-static TupleTableSlot *
-ExecHashJoinOuterGetTuple(PlanState *outerNode,
-						  HashJoinState *hjstate,
-						  uint32 *hashvalue)
-{
-	HashJoinTable hashtable = hjstate->hj_HashTable;
-	int			curbatch = hashtable->curbatch;
-	TupleTableSlot *slot;
-
-	if (curbatch == 0)			/* if it is the first pass */
-	{
-		/*
-		 * Check to see if first outer tuple was already fetched by
-		 * ExecHashJoin() and not used yet.
-		 */
-		slot = hjstate->hj_FirstOuterTupleSlot;
-		if (!TupIsNull(slot))
-			hjstate->hj_FirstOuterTupleSlot = NULL;
-		else
-			slot = ExecProcNode(outerNode);
-
-		while (!TupIsNull(slot))
-		{
-			/*
-			 * We have to compute the tuple's hash value.
-			 */
-			ExprContext *econtext = hjstate->js.ps.ps_ExprContext;
-
-			econtext->ecxt_outertuple = slot;
-			if (ExecHashGetHashValue(hashtable, econtext,
-									 hjstate->hj_OuterHashKeys,
-									 true,		/* outer tuple */
-									 HJ_FILL_OUTER(hjstate),
-									 hashvalue))
-			{
-				/* remember outer relation is not empty for possible rescan */
-				hjstate->hj_OuterNotEmpty = true;
-
-				return slot;
-			}
-
-			/*
-			 * That tuple couldn't match because of a NULL, so discard it and
-			 * continue with the next one.
-			 */
-			slot = ExecProcNode(outerNode);
-		}
-	}
-	else if (curbatch < hashtable->nbatch)
-	{
-		BufFile    *file = hashtable->outerBatchFile[curbatch];
-
-		/*
-		 * In outer-join cases, we could get here even though the batch file
-		 * is empty.
-		 */
-		if (file == NULL)
-			return NULL;
-
-		slot = ExecHashJoinGetSavedTuple(hjstate,
-										 file,
-										 hashvalue,
-										 hjstate->hj_OuterTupleSlot);
-		if (!TupIsNull(slot))
-			return slot;
-	}
-
-	/* End of this batch */
-	return NULL;
-}
-
-/*
  * ExecHashJoinNewBatch
  *		switch to a new hashjoin batch
  *
@@ -769,8 +635,7 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 					(errcode_for_file_access(),
 				   errmsg("could not rewind hash-join temporary file: %m")));
 
-		while ((slot = ExecHashJoinGetSavedTuple(hjstate,
-												 innerFile,
+		while ((slot = ExecHashJoinGetSavedTuple(innerFile,
 												 &hashvalue,
 												 hjstate->hj_HashTupleSlot)))
 		{
@@ -849,8 +714,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
  * itself is stored in the given slot.
  */
 static TupleTableSlot *
-ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
-						  BufFile *file,
+ExecHashJoinGetSavedTuple(BufFile *file,
 						  uint32 *hashvalue,
 						  TupleTableSlot *tupleSlot)
 {
@@ -893,7 +757,6 @@ ExecHashJoinGetSavedTuple(HashJoinState *hjstate,
 	return ExecStoreMinimalTuple(tuple, tupleSlot, true);
 }
 
-
 void
 ExecReScanHashJoin(HashJoinState *node)
 {
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index fe5c2642d7..1ac95a20fd 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,8 +16,9 @@
 
 #include "nodes/execnodes.h"
 
-extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags);
-extern TupleTableSlot *ExecHash(HashState *node);
+extern HashState *ExecInitHash(Hash *node, EState *estate, int eflags,
+							   PlanState* parent);
+extern bool pushTupleToHash(TupleTableSlot *slot, HashState *node);
 extern Node *MultiExecHash(HashState *node);
 extern void ExecEndHash(HashState *node);
 extern void ExecReScanHash(HashState *node);
@@ -39,9 +40,13 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable,
 						  int *bucketno,
 						  int *batchno);
 extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
+extern bool ExecScanHashBucketAndPush(HashJoinState *hjstate,
+									  ExprContext *econtext);
 extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
 extern bool ExecScanHashTableForUnmatched(HashJoinState *hjstate,
 							  ExprContext *econtext);
+extern bool ExecScanHashTableForUnmatchedAndPush(HashJoinState *hjstate,
+							  ExprContext *econtext);
 extern void ExecHashTableReset(HashJoinTable hashtable);
 extern void ExecHashTableResetMatchFlags(HashJoinTable hashtable);
 extern void ExecChooseHashTableSize(double ntuples, int tupwidth, bool useskew,
diff --git a/src/include/executor/nodeHashjoin.h b/src/include/executor/nodeHashjoin.h
index ddc32b1de3..8b3b88917c 100644
--- a/src/include/executor/nodeHashjoin.h
+++ b/src/include/executor/nodeHashjoin.h
@@ -16,13 +16,107 @@
 
 #include "nodes/execnodes.h"
 #include "storage/buffile.h"
+#include "executor/executor.h"
+#include "executor/hashjoin.h"
+#include "access/htup_details.h"
+#include "utils/memutils.h"
 
-extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate, int eflags);
-extern TupleTableSlot *ExecHashJoin(HashJoinState *node);
+/* Returns true if doing null-fill on outer relation */
+#define HJ_FILL_OUTER(hjstate)	((hjstate)->hj_NullInnerTupleSlot != NULL)
+/* Returns true if doing null-fill on inner relation */
+#define HJ_FILL_INNER(hjstate)	((hjstate)->hj_NullOuterTupleSlot != NULL)
+
+extern HashJoinState *ExecInitHashJoin(HashJoin *node, EState *estate,
+									   int eflags, PlanState *parent);
+extern bool pushTupleToHashJoinFromInner(TupleTableSlot *slot,
+								  HashJoinState *node);
+extern bool pushTupleToHashJoinFromOuter(TupleTableSlot *slot,
+										 HashJoinState *node);
 extern void ExecEndHashJoin(HashJoinState *node);
 extern void ExecReScanHashJoin(HashJoinState *node);
 
 extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
 					  BufFile **fileptr);
 
-#endif   /* NODEHASHJOIN_H */
+/* inline funcs decls and implementations */
+#pragma GCC diagnostic warning "-Winline"
+static inline bool CheckOtherQualAndPush(HashJoinState *node);
+static inline bool PushUnmatched(HashJoinState *node);
+static inline bool CheckJoinQualAndPush(HashJoinState *node);
+
+/*
+ * Everything is ready for checking otherqual and projecting; do that,
+ * and push the result.
+ *
+ * Returns true if parent accepts more tuples, false otherwise
+ */
+static inline bool CheckOtherQualAndPush(HashJoinState *node)
+{
+	ExprContext *econtext = node->js.ps.ps_ExprContext;
+	List *otherqual = node->js.ps.qual;
+	TupleTableSlot *slot;
+
+	if (otherqual == NIL ||
+		ExecQual(otherqual, econtext, false))
+	{
+		slot = ExecProject(node->js.ps.ps_ProjInfo);
+		return pushTuple(slot, node->js.ps.parent, (PlanState *) node);
+	}
+	else
+		InstrCountFiltered2(node, 1);
+	return true;
+}
+
+/*
+ * Push inner tuple with no match, ExecScanHashTableForUnmatchedAndPush
+ * prepared state needed for ExecQual.
+ *
+ * Returns true if parent accepts more tuples, false otherwise.
+ */
+static inline bool PushUnmatched(HashJoinState *node)
+{
+	ExprContext *econtext = node->js.ps.ps_ExprContext;
+	/*
+	 * Reset per-tuple memory context to free any expression evaluation
+	 * storage.
+	 */
+	ResetExprContext(econtext);
+
+	/*
+	 * Generate a fake join tuple with nulls for the outer tuple,
+	 * and return it if it passes the non-join quals.
+	 */
+	econtext->ecxt_outertuple = node->hj_NullOuterTupleSlot;
+	return CheckOtherQualAndPush(node);
+}
+
+/*
+ * We have found inner tuple with hashed quals matched to the current outer
+ * tuple. Now check non-hashed quals, other quals, then project and push
+ * the result.
+ *
+ * State for ExecQual was already set by ExecScanHashBucketAndPush and before.
+ * Returns true if parent accepts more tuples, false otherwise.
+ */
+static inline bool CheckJoinQualAndPush(HashJoinState *node)
+{
+	List	   *joinqual = node->js.joinqual;
+	ExprContext *econtext = node->js.ps.ps_ExprContext;
+
+	/*
+	 * Only the joinquals determine tuple match status, but all
+	 * quals must pass to actually return the tuple.
+	 */
+	if (joinqual == NIL || ExecQual(joinqual, econtext, false))
+	{
+		node->hj_MatchedOuter = true;
+		HeapTupleHeaderSetMatch(HJTUPLE_MINTUPLE(node->hj_CurTuple));
+		return CheckOtherQualAndPush(node);
+	}
+	else
+		InstrCountFiltered1(node, 1);
+
+	return true;
+}
+
+#endif	 /* NODEHASHJOIN_H */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index da7fd9c7ac..abbe67ba0c 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2145,6 +2145,8 @@ typedef struct HashState
 	HashJoinTable hashtable;	/* hash table for the hashjoin */
 	List	   *hashkeys;		/* list of ExprState nodes */
 	/* hashkeys is same as parent's hj_InnerHashKeys */
+	/* on the first push we must build the hashtable */
+	bool first_time_through;
 } HashState;
 
 /* ----------------
-- 
2.11.0

