From 59864324c953137555af058413135e80f947f1ce Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Mon, 3 Feb 2020 20:03:28 -0600
Subject: [PATCH v1 3/3] Use tuplesort rather than index scan..

This is largely stolen from heapam_relation_copy_for_cluster, which can either
do an indexscan (if it's cheaper) or tablescan+sort.

This resolves the hack in the previous commit by using tuplesort, since indexes
have already been dropped.
---
 src/backend/commands/tablecmds.c | 135 ++++++++++++++++++++-------------------
 1 file changed, 70 insertions(+), 65 deletions(-)

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 44f94d2..5698cd7 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -98,6 +98,7 @@
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/timestamp.h"
+#include "utils/tuplesort.h"
 #include "utils/typcache.h"
 
 /*
@@ -176,6 +177,7 @@ typedef struct AlteredTableInfo
 	List	   *changedConstraintDefs;	/* string definitions of same */
 	List	   *changedIndexOids;	/* OIDs of indexes to rebuild */
 	List	   *changedIndexDefs;	/* string definitions of same */
+	Tuplesortstate	*tuplesort;	/* sorted for rewrite */
 } AlteredTableInfo;
 
 /* Struct describing one new constraint to check in Phase 3 scan */
@@ -364,7 +366,6 @@ static void ATRewriteTables(AlterTableStmt *parsetree,
 							List **wqueue, LOCKMODE lockmode,
 							AlterTableUtilityContext *context);
 static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode);
-static Oid cluster_index(Relation rel);
 static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
 static void ATSimplePermissions(Relation rel, int allowed_targets);
 static void ATWrongRelkindError(Relation rel, int allowed_targets);
@@ -4295,7 +4296,7 @@ ATRewriteCatalogs(List **wqueue, LOCKMODE lockmode,
 			 * multiple columns of a table are altered).
 			 */
 			if (pass == AT_PASS_ALTER_TYPE)
-				; // ATPostAlterTypeCleanup(wqueue, tab, lockmode);
+				ATPostAlterTypeCleanup(wqueue, tab, lockmode);
 
 			relation_close(rel, NoLock);
 		}
@@ -5006,36 +5007,6 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode,
 	}
 }
 
-/* Return the OID of the index on which the table was previously (SET) clustered */
-/* stolen from alter.c */
-static Oid cluster_index(Relation rel)
-{
-	ListCell   *index;
-
-	/* We need to find the index that has indisclustered set. */
-	foreach(index, RelationGetIndexList(rel))
-	{
-		HeapTuple		idxtuple;
-		Form_pg_index	indexForm;
-		Oid				indexOid;
-
-		indexOid = lfirst_oid(index);
-		idxtuple = SearchSysCache1(INDEXRELID,
-						   ObjectIdGetDatum(indexOid));
-		if (!HeapTupleIsValid(idxtuple))
-				elog(ERROR, "cache lookup failed for index %u", indexOid);
-		indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
-		if (indexForm->indisclustered)
-		{
-				ReleaseSysCache(idxtuple);
-				return indexOid;
-		}
-		ReleaseSysCache(idxtuple);
-	}
-
-	return InvalidOid;
-}
-
 /*
  * ATRewriteTable: scan or rewrite one table
  *
@@ -5046,7 +5017,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 {
 	Relation	oldrel;
 	Relation	newrel;
-	Relation	index;
 	TupleDesc	oldTupDesc;
 	TupleDesc	newTupDesc;
 	bool		needscan = false;
@@ -5058,7 +5028,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	BulkInsertState bistate;
 	int			ti_options;
 	ExprState  *partqualstate = NULL;
-	Oid			OIDindex;
 
 	/*
 	 * Open the relation(s).  We have surely already locked the existing
@@ -5073,9 +5042,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	else
 		newrel = NULL;
 
-	OIDindex = cluster_index(oldrel);
-	index = OidIsValid(OIDindex) ? index_open(OIDindex, lockmode) : NULL;
-
 	/*
 	 * Prepare a BulkInsertState and options for table_tuple_insert. Because
 	 * we're building a new heap, we can skip WAL-logging and fsync it to disk
@@ -5165,8 +5131,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 		ExprContext *econtext;
 		TupleTableSlot *oldslot;
 		TupleTableSlot *newslot;
-		TableScanDesc tblscan;
-		IndexScanDesc indscan;
+		TableScanDesc scan;
 		MemoryContext oldCxt;
 		List	   *dropped_attrs = NIL;
 		ListCell   *lc;
@@ -5242,13 +5207,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 		 * checking all the constraints.
 		 */
 		snapshot = RegisterSnapshot(GetLatestSnapshot());
-		if (index) {
-			indscan = index_beginscan(oldrel, index, snapshot, 0, 0); // SnapshotAny?
-			tblscan = NULL;
-		} else {
-			tblscan = table_beginscan(oldrel, snapshot, 0, NULL);
-			indscan = NULL;
-		}
+		scan = table_beginscan(oldrel, snapshot, 0, NULL);
 
 		/*
 		 * Switch to per-tuple memory context and reset it for each tuple
@@ -5256,19 +5215,10 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 		 */
 		oldCxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 
-		for (;;)
+		while (table_scan_getnextslot(scan, ForwardScanDirection, oldslot))
 		{
 			TupleTableSlot *insertslot;
 
-			if (tblscan) {
-				if (!table_scan_getnextslot(tblscan, ForwardScanDirection, oldslot))
-					break;
-			} else {
-				/* indscan */
-				if (!index_getnext_slot(indscan, ForwardScanDirection, oldslot))
-					break;
-			}
-
 			if (tab->rewrite > 0)
 			{
 				/* Extract data from old tuple */
@@ -5403,10 +5353,15 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 									RelationGetRelationName(oldrel))));
 			}
 
-			/* Write the tuple out to the new relation */
-			if (newrel)
-				table_tuple_insert(newrel, insertslot, mycid,
+			/* Write the tuple out to the new relation, or to tuplesort */
+			if (newrel) {
+				if (tab->tuplesort) {
+					HeapTuple tuple = ExecFetchSlotHeapTuple(insertslot, false, NULL);
+					tuplesort_putheaptuple(tab->tuplesort, tuple);
+				} else
+					table_tuple_insert(newrel, insertslot, mycid,
 								   ti_options, bistate);
+			}
 
 			ResetExprContext(econtext);
 
@@ -5414,10 +5369,47 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 		}
 
 		MemoryContextSwitchTo(oldCxt);
-		if (tblscan)
-			table_endscan(tblscan);
-		if (indscan)
-			index_endscan(indscan);
+		table_endscan(scan);
+
+		if (newrel && tab->tuplesort)
+		{
+			TupleDesc	oldTupDesc = RelationGetDescr(oldrel);
+			TupleDesc	newTupDesc = RelationGetDescr(newrel);
+
+			int			natts;
+			Datum		*values;
+			bool		*isnull;
+			HeapTuple	tuple;
+
+			/* Sort and then write out from the tuplesort to the table */
+			tuplesort_performsort(tab->tuplesort);
+
+			/* Preallocate values/isnull arrays */
+			natts = newTupDesc->natts;
+			values = (Datum *) palloc(natts * sizeof(Datum));
+			isnull = (bool *) palloc(natts * sizeof(bool));
+
+			while ((tuple = tuplesort_getheaptuple(tab->tuplesort, true)) != NULL)
+			{
+				CHECK_FOR_INTERRUPTS();
+
+				/* stolen from reform_and_rewrite_tuple */
+				heap_deform_tuple(tuple, oldTupDesc, values, isnull);
+				/* Be sure to null out any dropped columns */
+				for (int i = 0; i < newTupDesc->natts; i++)
+					if (TupleDescAttr(newTupDesc, i)->attisdropped)
+						isnull[i] = true;
+
+				ExecClearTuple(newslot);
+				memcpy(newslot->tts_values, values, sizeof(Datum)*natts);
+				memcpy(newslot->tts_isnull, isnull, sizeof(bool)*natts);
+				ExecStoreVirtualTuple(newslot);
+				table_tuple_insert(newrel, newslot, mycid, ti_options, bistate);
+
+			}
+			tuplesort_end(tab->tuplesort);
+		}
+
 		UnregisterSnapshot(snapshot);
 
 		ExecDropSingleTupleTableSlot(oldslot);
@@ -5428,8 +5420,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 	FreeExecutorState(estate);
 
 	table_close(oldrel, NoLock);
-	if (index)
-		table_close(index, NoLock);
 	if (newrel)
 	{
 		FreeBulkInsertState(bistate);
@@ -11785,12 +11775,27 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
 	{
 		Oid			oldId = lfirst_oid(oid_item);
 		Oid			relid;
+		Relation		ind;
 
 		relid = IndexGetRelation(oldId, false);
 		ATPostAlterTypeParse(oldId, relid, InvalidOid,
 							 (char *) lfirst(def_item),
 							 wqueue, lockmode, tab->rewrite);
 
+
+		ind = index_open(oldId, ShareUpdateExclusiveLock);
+		if (ind->rd_index->indisclustered) {
+			Relation rel = table_open(relid, ShareUpdateExclusiveLock);
+			TupleDesc oldDesc = RelationGetDescr(rel);
+			table_close(rel, NoLock);
+			/* TODO: parallelize ? */
+			tab->tuplesort = tuplesort_begin_cluster(oldDesc, ind, maintenance_work_mem, NULL, false);
+			index_close(ind, NoLock);
+		} else {
+			tab->tuplesort = NULL;
+			index_close(ind, NoLock);
+		}
+
 		ObjectAddressSet(obj, RelationRelationId, oldId);
 		add_exact_object_address(&obj, objects);
 	}
-- 
2.7.4

