From 92ca2c15ccf2adb9b7f2da9cf86b571534287136 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Sat, 15 Jul 2023 15:13:30 -0700
Subject: [PATCH v1] optimize heapgetpage()

---
 src/backend/access/heap/heapam.c | 100 ++++++++++++++++++++++---------
 1 file changed, 72 insertions(+), 28 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 7ed72abe597..d662d2898a2 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -366,6 +366,56 @@ heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlk
 	scan->rs_numblocks = numBlks;
 }
 
+/*
+ * The loop for heapgetpage() in pagemode. Pulled out so it can be called
+ * multiple times, with constant arguments for all_visible,
+ * check_serializable.
+ */
+pg_attribute_always_inline
+static int
+heapgetpage_collect(HeapScanDesc scan, Snapshot snapshot,
+					Page page, Buffer buffer,
+					BlockNumber block, int lines,
+					bool all_visible, bool check_serializable)
+{
+	int			ntup = 0;
+	OffsetNumber lineoff;
+
+	for (lineoff = FirstOffsetNumber; lineoff <= lines; lineoff++)
+	{
+		ItemId		lpp = PageGetItemId(page, lineoff);
+		bool		valid;
+		HeapTupleData loctup;
+
+		if (!ItemIdIsNormal(lpp))
+			continue;
+
+		loctup.t_data = (HeapTupleHeader) PageGetItem(page, lpp);
+		loctup.t_len = ItemIdGetLength(lpp);
+		loctup.t_tableOid = RelationGetRelid(scan->rs_base.rs_rd);
+		ItemPointerSet(&(loctup.t_self), block, lineoff);
+
+		if (all_visible)
+			valid = true;
+		else
+			valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
+
+		if (check_serializable)
+			HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
+												&loctup, buffer, snapshot);
+
+		if (valid)
+		{
+			scan->rs_vistuples[ntup] = lineoff;
+			ntup++;
+		}
+	}
+
+	Assert(ntup <= MaxHeapTuplesPerPage);
+
+	return ntup;
+}
+
 /*
  * heapgetpage - subroutine for heapgettup()
  *
@@ -381,9 +431,8 @@ heapgetpage(TableScanDesc sscan, BlockNumber block)
 	Snapshot	snapshot;
 	Page		page;
 	int			lines;
-	int			ntup;
-	OffsetNumber lineoff;
 	bool		all_visible;
+	bool		check_serializable;
 
 	Assert(block < scan->rs_nblocks);
 
@@ -427,7 +476,6 @@ heapgetpage(TableScanDesc sscan, BlockNumber block)
 	page = BufferGetPage(buffer);
 	TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page);
 	lines = PageGetMaxOffsetNumber(page);
-	ntup = 0;
 
 	/*
 	 * If the all-visible flag indicates that all tuples on the page are
@@ -450,37 +498,33 @@ heapgetpage(TableScanDesc sscan, BlockNumber block)
 	 * tuple for visibility the hard way.
 	 */
 	all_visible = PageIsAllVisible(page) && !snapshot->takenDuringRecovery;
+	check_serializable =
+		CheckForSerializableConflictOutNeeded(scan->rs_base.rs_rd, snapshot);
 
-	for (lineoff = FirstOffsetNumber; lineoff <= lines; lineoff++)
+	/*
+	 * We call heapgetpage_collect() with constant arguments, for faster code,
+	 * without having to duplicate too much logic.
+	 */
+	if (likely(all_visible))
 	{
-		ItemId		lpp = PageGetItemId(page, lineoff);
-		HeapTupleData loctup;
-		bool		valid;
-
-		if (!ItemIdIsNormal(lpp))
-			continue;
-
-		loctup.t_tableOid = RelationGetRelid(scan->rs_base.rs_rd);
-		loctup.t_data = (HeapTupleHeader) PageGetItem(page, lpp);
-		loctup.t_len = ItemIdGetLength(lpp);
-		ItemPointerSet(&(loctup.t_self), block, lineoff);
-
-		if (all_visible)
-			valid = true;
+		if (likely(!check_serializable))
+			scan->rs_ntuples = heapgetpage_collect(scan, snapshot, page, buffer,
+												   block, lines, 1, 0);
 		else
-			valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
-
-		HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
-											&loctup, buffer, snapshot);
-
-		if (valid)
-			scan->rs_vistuples[ntup++] = lineoff;
+			scan->rs_ntuples = heapgetpage_collect(scan, snapshot, page, buffer,
+												   block, lines, 1, 1);
+	}
+	else
+	{
+		if (likely(!check_serializable))
+			scan->rs_ntuples = heapgetpage_collect(scan, snapshot, page, buffer,
+												   block, lines, 0, 0);
+		else
+			scan->rs_ntuples = heapgetpage_collect(scan, snapshot, page, buffer,
+												   block, lines, 0, 1);
 	}
 
 	LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-
-	Assert(ntup <= MaxHeapTuplesPerPage);
-	scan->rs_ntuples = ntup;
 }
 
 /*
-- 
2.38.0

