Index: src/backend/commands/cluster.c
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/backend/commands/cluster.c,v
retrieving revision 1.83
diff -c -r1.83 cluster.c
*** src/backend/commands/cluster.c	2002/07/12 18:43:15	1.83
--- src/backend/commands/cluster.c	2002/08/08 06:10:21
***************
*** 27,71 ****
  #include "catalog/dependency.h"
  #include "catalog/heap.h"
  #include "catalog/index.h"
! #include "catalog/pg_index.h"
! #include "catalog/pg_proc.h"
  #include "commands/cluster.h"
  #include "commands/tablecmds.h"
  #include "miscadmin.h"
! #include "utils/builtins.h"
  #include "utils/lsyscache.h"
  #include "utils/syscache.h"
  
  
  static Oid	copy_heap(Oid OIDOldHeap, const char *NewName);
- static Oid	copy_index(Oid OIDOldIndex, Oid OIDNewHeap,
- 					   const char *NewIndexName);
  static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
  
  /*
   * cluster
   *
!  * STILL TO DO:
!  *	 Create a list of all the other indexes on this relation. Because
!  *	 the cluster will wreck all the tids, I'll need to destroy bogus
!  *	 indexes. The user will have to re-create them. Not nice, but
!  *	 I'm not a nice guy. The alternative is to try some kind of post
!  *	 destroy re-build. This may be possible. I'll check out what the
!  *	 index create functiond want in the way of paramaters. On the other
!  *	 hand, re-creating n indexes may blow out the space.
   */
  void
  cluster(RangeVar *oldrelation, char *oldindexname)
  {
  	Oid			OIDOldHeap,
  				OIDOldIndex,
! 				OIDNewHeap,
! 				OIDNewIndex;
  	Relation	OldHeap,
  				OldIndex;
  	char		NewHeapName[NAMEDATALEN];
- 	char		NewIndexName[NAMEDATALEN];
  	ObjectAddress object;
  
  	/*
  	 * We grab exclusive access to the target rel and index for the
--- 27,95 ----
  #include "catalog/dependency.h"
  #include "catalog/heap.h"
  #include "catalog/index.h"
! #include "catalog/indexing.h"
! #include "catalog/catname.h"
  #include "commands/cluster.h"
  #include "commands/tablecmds.h"
  #include "miscadmin.h"
! #include "utils/fmgroids.h"
  #include "utils/lsyscache.h"
  #include "utils/syscache.h"
+ #include "utils/relcache.h"
  
+ /*
+  * We need one of these structs for each index in the relation to be
+  * clustered.  It's basically the data needed by index_create() so
+  * we can recreate the indexes after destroying the old heap.
+  */
+ typedef struct
+ {
+ 	char	   *indexName;
+ 	IndexInfo  *indexInfo;
+ 	Oid			accessMethodOID;
+ 	Oid		   *classOID;
+ 	Oid			indexOID;
+ 	bool		isPrimary;
+ } IndexAttrs;
  
  static Oid	copy_heap(Oid OIDOldHeap, const char *NewName);
  static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
+ static List *get_indexattr_list (Oid OIDOldHeap);
+ static void recreate_indexattr(Oid OIDOldHeap, List *indexes);
+ static void swap_relfilenodes(Oid r1, Oid r2);
  
+ Relation RelationIdGetRelation(Oid relationId);
+ 
  /*
   * cluster
+  *
+  * This clusters the table by creating a new, clustered table and
+  * swapping the relfilenodes of the new table and the old table, so
+  * the OID of the original table is preserved.  Thus we do not lose
+  * GRANT, inheritance nor references to this table (this was a bug
+  * in releases thru 7.3)
   *
!  * Also create new indexes and swap the filenodes with the old indexes
!  * the same way we do for the relation.
!  *
!  * TODO:
!  * 		maybe we can get away with AccessShareLock for the table.
!  * 		Concurrency would be much improved.  Only acquire
!  * 		AccessExclusiveLock right before swapping the filenodes.
!  * 		This would allow users to CLUSTER on a regular basis,
!  * 		practically eliminating the need for auto-clustered indexes.
   */
  void
  cluster(RangeVar *oldrelation, char *oldindexname)
  {
  	Oid			OIDOldHeap,
  				OIDOldIndex,
! 				OIDNewHeap;
  	Relation	OldHeap,
  				OldIndex;
  	char		NewHeapName[NAMEDATALEN];
  	ObjectAddress object;
+ 	List	   *indexes;
  
  	/*
  	 * We grab exclusive access to the target rel and index for the
***************
*** 96,101 ****
--- 120,128 ----
  	heap_close(OldHeap, NoLock);
  	index_close(OldIndex);
  
+ 	/* Save the information of all indexes on the relation. */
+ 	indexes = get_indexattr_list(OIDOldHeap);
+ 
  	/*
  	 * Create the new heap with a temporary name.
  	 */
***************
*** 113,141 ****
  	/* To make the new heap's data visible. */
  	CommandCounterIncrement();
  
! 	/* Create new index over the tuples of the new heap. */
! 	snprintf(NewIndexName, NAMEDATALEN, "temp_%u", OIDOldIndex);
  
- 	OIDNewIndex = copy_index(OIDOldIndex, OIDNewHeap, NewIndexName);
- 
  	CommandCounterIncrement();
  
! 	/* Destroy old heap (along with its index) and rename new. */
  	object.classId = RelOid_pg_class;
! 	object.objectId = OIDOldHeap;
  	object.objectSubId = 0;
  
! 	/* XXX better to use DROP_CASCADE here? */
  	performDeletion(&object, DROP_RESTRICT);
  
  	/* performDeletion does CommandCounterIncrement at end */
  
! 	renamerel(OIDNewHeap, oldrelation->relname);
! 
! 	/* This one might be unnecessary, but let's be safe. */
! 	CommandCounterIncrement();
! 
! 	renamerel(OIDNewIndex, oldindexname);
  }
  
  static Oid
--- 140,166 ----
  	/* To make the new heap's data visible. */
  	CommandCounterIncrement();
  
! 	/* Swap the relfilenodes of the old and new heaps. */
! 	swap_relfilenodes(OIDNewHeap, OIDOldHeap);
  
  	CommandCounterIncrement();
  
! 	/* Destroy new heap with old filenode */
  	object.classId = RelOid_pg_class;
! 	object.objectId = OIDNewHeap;
  	object.objectSubId = 0;
  
! 	/* The relation is local to our transaction and we know nothin
! 	 * depends on it, so DROP_RESTRICT should be OK.
! 	 */
  	performDeletion(&object, DROP_RESTRICT);
  
  	/* performDeletion does CommandCounterIncrement at end */
  
!  	/* Recreate the indexes on the relation.  We do not need
!   	 * CommandCounterIncrement() because recreate_indexattr does it.
!    	 */
!   	recreate_indexattr(OIDOldHeap, indexes);
  }
  
  static Oid
***************
*** 181,223 ****
  	return OIDNewHeap;
  }
  
- static Oid
- copy_index(Oid OIDOldIndex, Oid OIDNewHeap, const char *NewIndexName)
- {
- 	Oid			OIDNewIndex;
- 	Relation	OldIndex,
- 				NewHeap;
- 	IndexInfo  *indexInfo;
- 
- 	NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
- 	OldIndex = index_open(OIDOldIndex);
- 
- 	/*
- 	 * Create a new index like the old one.  To do this I get the info
- 	 * from pg_index, and add a new index with a temporary name (that will
- 	 * be changed later).
- 	 */
- 	indexInfo = BuildIndexInfo(OldIndex->rd_index);
- 
- 	OIDNewIndex = index_create(OIDNewHeap,
- 							   NewIndexName,
- 							   indexInfo,
- 							   OldIndex->rd_rel->relam,
- 							   OldIndex->rd_index->indclass,
- 							   OldIndex->rd_index->indisprimary,
- 							   false, /* XXX losing constraint status */
- 							   allowSystemTableMods);
- 
- 	setRelhasindex(OIDNewHeap, true,
- 				   OldIndex->rd_index->indisprimary, InvalidOid);
- 
- 	index_close(OldIndex);
- 	heap_close(NewHeap, NoLock);
- 
- 	return OIDNewIndex;
- }
- 
- 
  static void
  rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  {
--- 206,211 ----
***************
*** 260,263 ****
--- 248,432 ----
  	index_close(LocalOldIndex);
  	heap_close(LocalOldHeap, NoLock);
  	heap_close(LocalNewHeap, NoLock);
+ }
+ 
+ /* Get the necessary info about the indexes in the relation and
+  * return a List of IndexAttrs.
+  */
+ List *
+ get_indexattr_list (Oid OIDOldHeap)
+ {
+ 	ScanKeyData	entry;
+ 	HeapScanDesc scan;
+ 	Relation indexRelation;
+ 	HeapTuple indexTuple;
+ 	List *indexes = NIL;
+ 	IndexAttrs *attrs;
+ 	HeapTuple tuple;
+ 	Form_pg_index index;
+ 	
+ 	/* Grab the index tuples by looking into RelationRelationName
+ 	 * by the OID of the old heap.
+ 	 */
+ 	indexRelation = heap_openr(IndexRelationName, AccessShareLock);
+ 	ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
+ 			F_OIDEQ, ObjectIdGetDatum(OIDOldHeap));
+ 	scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry);
+ 	while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ 	{
+ 		index = (Form_pg_index) GETSTRUCT(indexTuple);
+ 
+ 		attrs = (IndexAttrs *) palloc(sizeof(IndexAttrs));
+ 		attrs->indexInfo = BuildIndexInfo(index);
+ 		attrs->isPrimary = index->indisprimary;
+ 		attrs->indexOID = index->indexrelid;
+ 
+ 		/* The opclasses are copied verbatim from the original indexes.
+ 		*/
+ 		attrs->classOID = (Oid *)palloc(sizeof(Oid) *
+ 				attrs->indexInfo->ii_NumIndexAttrs);
+ 		memcpy(attrs->classOID, index->indclass,
+ 				sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
+ 
+ 		/* Name and access method of each index come from
+ 		 * RelationRelationName.
+ 		 */
+ 		tuple = SearchSysCache(RELOID,
+ 				ObjectIdGetDatum(attrs->indexOID),
+ 				0, 0, 0);
+ 		if (!HeapTupleIsValid(tuple))
+ 			elog(ERROR, "CLUSTER: cannot find index %u", attrs->indexOID);
+ 		attrs->indexName = pstrdup(NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname));
+ 		attrs->accessMethodOID = ((Form_pg_class) GETSTRUCT(tuple))->relam;
+ 		ReleaseSysCache(tuple);
+ 
+ 		/* Cons the gathered data into the list.  We do not care about
+ 		 * ordering, and this is more efficient than append.
+ 		 */
+ 		indexes=lcons((void *)attrs, indexes);
+ 	}
+ 	heap_endscan(scan);
+ 	heap_close(indexRelation, AccessShareLock);
+ 	return indexes;
+ }
+ 
+ /* Create new indexes and swap the filenodes with old indexes.  Then drop
+  * the new index (carrying the old heap along).
+  */
+ void
+ recreate_indexattr(Oid OIDOldHeap, List *indexes)
+ {
+ 	IndexAttrs *attrs;
+ 	List 	   *elem;
+ 	Oid			newIndexOID;
+ 	char		newIndexName[NAMEDATALEN];
+ 	ObjectAddress object;
+ 
+ 	foreach (elem, indexes)
+ 	{
+ 		attrs=(IndexAttrs *) lfirst(elem);
+ 
+ 		/* Create the new index under a temporary name */
+ 		snprintf(newIndexName, NAMEDATALEN, "temp_%u", attrs->indexOID);
+ 
+ 		/* The new index will have constraint status set to false,
+ 		 * but since we will only use its filenode it doesn't matter:
+ 		 * after the filenode swap the index will keep the constraint
+ 		 * status of the old index.
+ 		 */
+ 		newIndexOID = index_create(OIDOldHeap, newIndexName,
+ 								   attrs->indexInfo, attrs->accessMethodOID,
+ 								   attrs->classOID, attrs->isPrimary,
+ 								   false, allowSystemTableMods);
+ 		CommandCounterIncrement();
+ 
+ 		/* Swap the filenodes. */
+ 		swap_relfilenodes(newIndexOID, attrs->indexOID);
+ 		setRelhasindex(OIDOldHeap, true, attrs->isPrimary, InvalidOid);
+ 
+ 		/* Destroy new index with old filenode */
+ 		object.classId = RelOid_pg_class;
+ 		object.objectId = newIndexOID;
+ 		object.objectSubId = 0;
+ 		
+ 		/* The relation is local to our transaction and we know
+ 		 * nothing depends on it, so DROP_RESTRICT should be OK.
+ 		 */
+ 		performDeletion(&object, DROP_RESTRICT);
+ 		
+ 		/* performDeletion does CommandCounterIncrement() at its end */
+ 		
+ 		pfree(attrs->classOID);
+ 		pfree(attrs);
+ 	}
+ 	freeList(indexes);
+ }
+ 
+ /* Swap the relfilenodes for two given relations.
+  */
+ void
+ swap_relfilenodes(Oid r1, Oid r2)
+ {
+ 	/* I can probably keep RelationRelationName open in the main
+ 	 * function and pass the Relation around so I don't have to open
+ 	 * it every time.
+ 	 */
+ 	Relation	relRelation,
+ 				rel;
+ 	HeapTuple	reltup[2];
+ 	Oid			tempRFNode;
+ 	int			i;
+ 	CatalogIndexState indstate;
+ 
+ 	/* We need both RelationRelationName tuples.  */
+ 	relRelation = heap_openr(RelationRelationName, RowExclusiveLock);
+ 
+ 	reltup[0] = SearchSysCacheCopy(RELOID,
+ 								   ObjectIdGetDatum(r1),
+ 								   0, 0, 0);
+ 	if (!HeapTupleIsValid(reltup[0]))
+ 		elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r1);
+ 	reltup[1] = SearchSysCacheCopy(RELOID,
+ 								   ObjectIdGetDatum(r2),
+ 								   0, 0, 0);
+ 	if (!HeapTupleIsValid(reltup[1]))
+ 		elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r2);
+ 
+ 	/* The buffer manager gets confused if we swap relfilenodes for
+ 	 * relations that are not both local or non-local to this transaction.
+ 	 * Flush the buffers on both relations so the buffer manager can
+ 	 * forget about'em.
+ 	 */
+ 
+ 	rel = RelationIdGetRelation(r1);
+ 	i = FlushRelationBuffers(rel, 0);
+ 	if (i < 0)
+ 		elog(ERROR, "CLUSTER: FlushRelationBuffers returned %d", i);
+ 	RelationClose(rel);
+ 	rel = RelationIdGetRelation(r1);
+ 	i = FlushRelationBuffers(rel, 0);
+ 	if (i < 0)
+ 		elog(ERROR, "CLUSTER: FlushRelationBuffers returned %d", i);
+ 	RelationClose(rel);
+ 
+ 	/* Actually swap the filenodes */
+ 
+ 	tempRFNode = ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode;
+ 	((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode =
+ 		((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode;
+ 	((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode = tempRFNode;
+ 
+ 	/* Update the RelationRelationName tuples */
+ 	simple_heap_update(relRelation, &reltup[1]->t_self, reltup[1]);
+ 	simple_heap_update(relRelation, &reltup[0]->t_self, reltup[0]);
+ 	
+ 	/* To keep system catalogs current. */
+ 	indstate = CatalogOpenIndexes(relRelation);
+ 	CatalogIndexInsert(indstate, reltup[1]);
+ 	CatalogIndexInsert(indstate, reltup[0]);
+ 	CatalogCloseIndexes(indstate);
+ 
+ 	heap_close(relRelation, NoLock);
+ 	heap_freetuple(reltup[0]);
+ 	heap_freetuple(reltup[1]);
  }
Index: src/include/commands/cluster.h
===================================================================
RCS file: /projects/cvsroot/pgsql-server/src/include/commands/cluster.h,v
retrieving revision 1.14
diff -c -r1.14 cluster.h
*** src/include/commands/cluster.h	2002/06/20 20:29:49	1.14
--- src/include/commands/cluster.h	2002/08/08 06:10:22
***************
*** 14,26 ****
  #define CLUSTER_H
  
  /*
-  * defines for contant stuff
-  */
- #define _TEMP_RELATION_KEY_				"clXXXXXXXX"
- #define _SIZE_OF_TEMP_RELATION_KEY_		11
- 
- 
- /*
   * functions
   */
  extern void cluster(RangeVar *oldrelation, char *oldindexname);
--- 14,19 ----
Index: doc/src/sgml/ref/cluster.sgml
===================================================================
RCS file: /projects/cvsroot/pgsql-server/doc/src/sgml/ref/cluster.sgml,v
retrieving revision 1.16
diff -c -r1.16 cluster.sgml
*** doc/src/sgml/ref/cluster.sgml	2002/04/23 02:07:15	1.16
--- doc/src/sgml/ref/cluster.sgml	2002/08/08 06:10:22
***************
*** 75,93 ****
       
       
        
- ERROR: relation <tablerelation_number> inherits "table"
-        
-       
-        
- 	
- 	 This is not documented anywhere. It seems not to be possible to
- 	 cluster a table that is inherited.
- 	
-        
-       
-      
-      
-       
  ERROR: Relation table does not exist!
         
        
--- 75,80 ----
***************
*** 139,151 ****
     
  
     
-     The table is actually copied to a temporary table in index
-     order, then renamed back to the original name.  For this
-     reason, all grant permissions and other indexes are lost
-     when clustering is performed.
-    
- 
-    
      In cases where you are accessing single rows randomly
      within a table, the actual order of the data in the heap
      table is unimportant. However, if you tend to access some
--- 126,131 ----
***************
*** 194,199 ****
--- 174,193 ----
      fast because most of the heap data has already been
      ordered, and the existing index is used.
     
+ 
+    
+     During the cluster operation, a temporal table is created that contains
+     the table in the index order. Due to this, you need to have free space
+     on disk at least the size of the table itself, or the biggest index if
+     you have one that is larger than the table.
+    
+ 
+    
+     As opposed to previous releases, CLUSTER does not lose GRANT,
+     inheritance or foreign key information, and preserves indexes
+     other than the one being used for the CLUSTER.
+    
+