Index: src/backend/commands/cluster.c
RCS file: /projects/cvsroot/pgsql-server/src/backend/commands/cluster.c,v
retrieving revision 1.83
diff -c -r1.83 cluster.c
*** src/backend/commands/cluster.c 2002/07/12 18:43:15 1.83
--- src/backend/commands/cluster.c 2002/08/03 20:44:43
*** 27,71 ****
#include "catalog/dependency.h"
#include "catalog/heap.h"
#include "catalog/index.h"
#include "catalog/pg_index.h"
#include "catalog/pg_proc.h"
#include "commands/cluster.h"
#include "commands/tablecmds.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
static Oid copy_heap(Oid OIDOldHeap, const char *NewName);
- static Oid copy_index(Oid OIDOldIndex, Oid OIDNewHeap,
- const char *NewIndexName);
static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
* cluster
! * Create a list of all the other indexes on this relation. Because
! * the cluster will wreck all the tids, I'll need to destroy bogus
! * indexes. The user will have to re-create them. Not nice, but
! * I'm not a nice guy. The alternative is to try some kind of post
! * destroy re-build. This may be possible. I'll check out what the
! * index create functiond want in the way of paramaters. On the other
! * hand, re-creating n indexes may blow out the space.
cluster(RangeVar *oldrelation, char *oldindexname)
Oid OIDOldHeap,
! OIDNewHeap,
! OIDNewIndex;
Relation OldHeap,
char NewHeapName[NAMEDATALEN];
- char NewIndexName[NAMEDATALEN];
ObjectAddress object;
* We grab exclusive access to the target rel and index for the
--- 27,100 ----
#include "catalog/dependency.h"
#include "catalog/heap.h"
#include "catalog/index.h"
+ #include "catalog/indexing.h"
+ #include "catalog/catname.h"
#include "catalog/pg_index.h"
#include "catalog/pg_proc.h"
#include "commands/cluster.h"
#include "commands/tablecmds.h"
#include "miscadmin.h"
#include "utils/builtins.h"
+ #include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
+ #include "utils/relcache.h"
+ /*
+ * We need one of these structs for each index in the relation to be
+ * clustered. It's basically the data needed by index_create() so
+ * we can recreate the indexes after destroying the old heap.
+ */
+ typedef struct
+ {
+ char *indexName;
+ IndexInfo *indexInfo;
+ Oid accessMethodOID;
+ Oid *classOID;
+ Oid indexOID;
+ bool isPrimary;
+ } IndexAttrs;
static Oid copy_heap(Oid OIDOldHeap, const char *NewName);
static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
+ static List *get_indexattr_list (Oid OIDOldHeap);
+ static void recreate_indexattr(Oid OIDOldHeap, List *indexes);
+ static void swap_relfilenodes(Oid r1, Oid r2);
+ Relation RelationIdGetRelation(Oid relationId);
* cluster
+ *
+ * This clusters the table by creating a new, clustered table and
+ * swapping the relfilenodes of the new table and the old table, so
+ * the OID of the original table is preserved. Thus we do not lose
+ * GRANT, inheritance nor references to this table (this was a bug
+ * in releases thru 7.3)
+ *
+ * Also create new indexes and swap the filenodes with the old indexes
+ * the same way we do for the relation.
+ *
+ * TODO:
+ * maybe we can get away with AccessShareLock for the table.
+ * Concurrency would be much improved. Only acquire
+ * AccessExclusiveLock right before swapping the filenodes.
+ * This would allow users to CLUSTER on a regular basis,
+ * practically eliminating the need for auto-clustered indexes.
! * Preserve constraint bit for the indexes.
cluster(RangeVar *oldrelation, char *oldindexname)
Oid OIDOldHeap,
! OIDNewHeap;
Relation OldHeap,
char NewHeapName[NAMEDATALEN];
ObjectAddress object;
+ List *indexes;
* We grab exclusive access to the target rel and index for the
*** 96,101 ****
--- 125,133 ----
heap_close(OldHeap, NoLock);
+ /* Save the information of all indexes on the relation. */
+ indexes = get_indexattr_list(OIDOldHeap);
* Create the new heap with a temporary name.
*** 112,141 ****
/* To make the new heap's data visible. */
- /* Create new index over the tuples of the new heap. */
- snprintf(NewIndexName, NAMEDATALEN, "temp_%u", OIDOldIndex);
! OIDNewIndex = copy_index(OIDOldIndex, OIDNewHeap, NewIndexName);
! /* Destroy old heap (along with its index) and rename new. */
object.classId = RelOid_pg_class;
! object.objectId = OIDOldHeap;
object.objectSubId = 0;
! /* XXX better to use DROP_CASCADE here? */
performDeletion(&object, DROP_RESTRICT);
/* performDeletion does CommandCounterIncrement at end */
- renamerel(OIDNewHeap, oldrelation->relname);
! /* This one might be unnecessary, but let's be safe. */
! CommandCounterIncrement();
! renamerel(OIDNewIndex, oldindexname);
static Oid
--- 144,171 ----
/* To make the new heap's data visible. */
! /* Swap the relfilenodes of the old and new heaps. */
! swap_relfilenodes(OIDNewHeap, OIDOldHeap);
! /* Destroy new heap with old filenode */
object.classId = RelOid_pg_class;
! object.objectId = OIDNewHeap;
object.objectSubId = 0;
! /* The relation is local to our transaction and we know nothin
! * depends on it, so DROP_RESTRICT should be OK.
! */
performDeletion(&object, DROP_RESTRICT);
/* performDeletion does CommandCounterIncrement at end */
! /* Recreate the indexes on the relation. We do not need
! * CommandCounterIncrement() because recreate_indexattr does it.
! */
! recreate_indexattr(OIDOldHeap, indexes);
static Oid
*** 181,223 ****
return OIDNewHeap;
- static Oid
- copy_index(Oid OIDOldIndex, Oid OIDNewHeap, const char *NewIndexName)
- {
- Oid OIDNewIndex;
- Relation OldIndex,
- NewHeap;
- IndexInfo *indexInfo;
- NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
- OldIndex = index_open(OIDOldIndex);
- /*
- * Create a new index like the old one. To do this I get the info
- * from pg_index, and add a new index with a temporary name (that will
- * be changed later).
- */
- indexInfo = BuildIndexInfo(OldIndex->rd_index);
- OIDNewIndex = index_create(OIDNewHeap,
- NewIndexName,
- indexInfo,
- OldIndex->rd_rel->relam,
- OldIndex->rd_index->indclass,
- OldIndex->rd_index->indisprimary,
- false, /* XXX losing constraint status */
- allowSystemTableMods);
- setRelhasindex(OIDNewHeap, true,
- OldIndex->rd_index->indisprimary, InvalidOid);
- index_close(OldIndex);
- heap_close(NewHeap, NoLock);
- return OIDNewIndex;
- }
static void
rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
--- 211,216 ----
*** 260,263 ****
--- 253,441 ----
heap_close(LocalOldHeap, NoLock);
heap_close(LocalNewHeap, NoLock);
+ }
+ /* Get the necessary info about the indexes in the relation and
+ * return a List of IndexAttrs.
+ */
+ List *
+ get_indexattr_list (Oid OIDOldHeap)
+ {
+ ScanKeyData entry;
+ HeapScanDesc scan;
+ Relation indexRelation;
+ HeapTuple indexTuple;
+ List *indexes = NIL;
+ IndexAttrs *attrs;
+ HeapTuple tuple;
+ Form_pg_index index;
+ /* Grab the index tuples by looking into RelationRelationName
+ * by the OID of the old heap.
+ */
+ indexRelation = heap_openr(IndexRelationName, AccessShareLock);
+ ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
+ F_OIDEQ, ObjectIdGetDatum(OIDOldHeap));
+ scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry);
+ while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ index = (Form_pg_index) GETSTRUCT(indexTuple);
+ attrs = (IndexAttrs *) palloc(sizeof(IndexAttrs));
+ attrs->indexInfo = BuildIndexInfo(index);
+ attrs->isPrimary = index->indisprimary;
+ attrs->indexOID = index->indexrelid;
+ /* The opclasses are copied verbatim from the original indexes.
+ */
+ attrs->classOID = (Oid *)palloc(sizeof(Oid) *
+ attrs->indexInfo->ii_NumIndexAttrs);
+ memcpy(attrs->classOID, index->indclass,
+ sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
+ /* Name and access method of each index come from
+ * RelationRelationName.
+ */
+ tuple = SearchSysCache(RELOID,
+ ObjectIdGetDatum(attrs->indexOID),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "CLUSTER: cannot find index %u", attrs->indexOID);
+ attrs->indexName = pstrdup(NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname));
+ attrs->accessMethodOID = ((Form_pg_class) GETSTRUCT(tuple))->relam;
+ ReleaseSysCache(tuple);
+ /* Cons the gathered data into the list. We do not care about
+ * ordering, and this is more efficient than append.
+ */
+ indexes=lcons((void *)attrs, indexes);
+ }
+ heap_endscan(scan);
+ heap_close(indexRelation, AccessShareLock);
+ return indexes;
+ }
+ /* Create new indexes and swap the filenodes with old indexes. Then drop
+ * the new index (carrying the old heap along).
+ */
+ void
+ recreate_indexattr(Oid OIDOldHeap, List *indexes)
+ {
+ IndexAttrs *attrs;
+ List *elem;
+ Oid newIndexOID;
+ char newIndexName[NAMEDATALEN];
+ ObjectAddress object;
+ foreach (elem, indexes)
+ {
+ attrs=(IndexAttrs *) lfirst(elem);
+ /* Create the new index under a temporary name */
+ snprintf(newIndexName, NAMEDATALEN, "temp_%u", attrs->indexOID);
+ /* The new index will have constraint status set to false,
+ * but since we will only use its filenode it doesn't matter:
+ * after the filenode swap the index will keep the constraint
+ * status of the old index.
+ */
+ newIndexOID = index_create(OIDOldHeap, newIndexName,
+ attrs->indexInfo, attrs->accessMethodOID,
+ attrs->classOID, attrs->isPrimary,
+ false, allowSystemTableMods);
+ CommandCounterIncrement();
+ /* Swap the filenodes. */
+ swap_relfilenodes(newIndexOID, attrs->indexOID);
+ setRelhasindex(OIDOldHeap, true, attrs->isPrimary, InvalidOid);
+ /* I'm not sure this one is needed, but let's be safe. */
+ CommandCounterIncrement();
+ /* Destroy new index with old filenode */
+ object.classId = RelOid_pg_class;
+ object.objectId = newIndexOID;
+ object.objectSubId = 0;
+ /* The relation is local to our transaction and we know
+ * nothing depends on it, so DROP_RESTRICT should be OK.
+ */
+ performDeletion(&object, DROP_RESTRICT);
+ /* performDeletion does CommandCounterIncrement() at its end */
+ pfree(attrs->classOID);
+ pfree(attrs);
+ }
+ freeList(indexes);
+ }
+ /* Swap the relfilenodes for two given relations.
+ */
+ void
+ swap_relfilenodes(Oid r1, Oid r2)
+ {
+ /* I can probably keep RelationRelationName open in the main
+ * function and pass the Relation around so I don't have to open
+ * it every time.
+ */
+ Relation relRelation,
+ irels[Num_pg_class_indices],
+ rel;
+ HeapTuple reltup[2];
+ Oid tempRFNode;
+ int i;
+ /* We need both RelationRelationName tuples. */
+ relRelation = heap_openr(RelationRelationName, RowExclusiveLock);
+ reltup[0] = SearchSysCacheCopy(RELOID,
+ ObjectIdGetDatum(r1),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(reltup[0]))
+ elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r1);
+ reltup[1] = SearchSysCacheCopy(RELOID,
+ ObjectIdGetDatum(r2),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(reltup[1]))
+ elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r2);
+ /* The buffer manager gets confused if we swap relfilenodes for
+ * relations that are not both local or non-local to this transaction.
+ * Flush the buffers on both relations so the buffer manager can
+ * forget about'em.
+ */
+ rel = RelationIdGetRelation(r1);
+ i = FlushRelationBuffers(rel, 0);
+ if (i < 0)
+ elog(ERROR, "CLUSTER: FlushRelationBuffers returned %d", i);
+ RelationClose(rel);
+ rel = RelationIdGetRelation(r1);
+ i = FlushRelationBuffers(rel, 0);
+ if (i < 0)
+ elog(ERROR, "CLUSTER: FlushRelationBuffers returned %d", i);
+ RelationClose(rel);
+ /* Actually swap the filenodes */
+ tempRFNode = ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode;
+ ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode =
+ ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode;
+ ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode = tempRFNode;
+ /* Update the RelationRelationName tuples */
+ simple_heap_update(relRelation, &reltup[1]->t_self, reltup[1]);
+ simple_heap_update(relRelation, &reltup[0]->t_self, reltup[0]);
+ /* To keep system catalogs current. */
+ CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, irels);
+ CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[1]);
+ CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[0]);
+ CatalogCloseIndices(Num_pg_class_indices, irels);
+ CommandCounterIncrement();
+ heap_close(relRelation, NoLock);
+ heap_freetuple(reltup[0]);
+ heap_freetuple(reltup[1]);
Index: doc/src/sgml/ref/cluster.sgml
RCS file: /projects/cvsroot/pgsql-server/doc/src/sgml/ref/cluster.sgml,v
retrieving revision 1.16
diff -c -r1.16 cluster.sgml
*** doc/src/sgml/ref/cluster.sgml 2002/04/23 02:07:15 1.16
--- doc/src/sgml/ref/cluster.sgml 2002/08/03 20:44:44
*** 75,93 ****
- ERROR: relation <tablerelation_number> inherits "table"
- This is not documented anywhere. It seems not to be possible to
- cluster a table that is inherited.
ERROR: Relation table does not exist!
--- 75,80 ----
*** 139,151 ****
- The table is actually copied to a temporary table in index
- order, then renamed back to the original name. For this
- reason, all grant permissions and other indexes are lost
- when clustering is performed.
In cases where you are accessing single rows randomly
within a table, the actual order of the data in the heap
table is unimportant. However, if you tend to access some
--- 126,131 ----
*** 194,199 ****
--- 174,193 ----
fast because most of the heap data has already been
ordered, and the existing index is used.
+ During the cluster operation, a temporal table is created that contains
+ the table in the index order. Due to this, you need to have free space
+ on disk at least the size of the table itself, or the biggest index if
+ you have one that is larger than the table.
+ As opposed to previous releases, CLUSTER does not lose GRANT,
+ inheritance or foreign key information, and preserves indexes
+ other than the one being used for the CLUSTER.