Index: src/backend/commands/cluster.c =================================================================== RCS file: /cvsroot/pgsql/src/backend/commands/cluster.c,v retrieving revision 1.83 diff -c -r1.83 cluster.c *** src/backend/commands/cluster.c 12 Jul 2002 18:43:15 -0000 1.83 --- src/backend/commands/cluster.c 14 Jul 2002 06:38:44 -0000 *************** *** 15,21 **** * * * IDENTIFICATION ! * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.83 2002/07/12 18:43:15 tgl Exp $ * *------------------------------------------------------------------------- */ --- 15,21 ---- * * * IDENTIFICATION ! * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.82 2002/06/20 20:29:26 momjian Exp $ * *------------------------------------------------------------------------- */ *************** *** 24,74 **** #include "access/genam.h" #include "access/heapam.h" - #include "catalog/dependency.h" #include "catalog/heap.h" #include "catalog/index.h" #include "catalog/pg_index.h" #include "catalog/pg_proc.h" #include "commands/cluster.h" #include "commands/tablecmds.h" #include "miscadmin.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/syscache.h" static Oid copy_heap(Oid OIDOldHeap, const char *NewName); - static Oid copy_index(Oid OIDOldIndex, Oid OIDNewHeap, - const char *NewIndexName); static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); /* * cluster * * STILL TO DO: ! * Create a list of all the other indexes on this relation. Because ! * the cluster will wreck all the tids, I'll need to destroy bogus ! * indexes. The user will have to re-create them. Not nice, but ! * I'm not a nice guy. The alternative is to try some kind of post ! * destroy re-build. This may be possible. I'll check out what the ! * index create functiond want in the way of paramaters. On the other ! * hand, re-creating n indexes may blow out the space. */ void cluster(RangeVar *oldrelation, char *oldindexname) { Oid OIDOldHeap, OIDOldIndex, ! OIDNewHeap, ! OIDNewIndex; Relation OldHeap, OldIndex; char NewHeapName[NAMEDATALEN]; ! char NewIndexName[NAMEDATALEN]; ObjectAddress object; /* ! * We grab exclusive access to the target rel and index for the * duration of the transaction. */ OldHeap = heap_openrv(oldrelation, AccessExclusiveLock); --- 24,100 ---- #include "access/genam.h" #include "access/heapam.h" #include "catalog/heap.h" + #include "catalog/dependency.h" #include "catalog/index.h" + #include "catalog/indexing.h" + #include "catalog/catname.h" #include "catalog/pg_index.h" #include "catalog/pg_proc.h" #include "commands/cluster.h" #include "commands/tablecmds.h" #include "miscadmin.h" #include "utils/builtins.h" + #include "utils/fmgroids.h" #include "utils/lsyscache.h" + #include "utils/relcache.h" #include "utils/syscache.h" + /* + * We need one of these structs for each index in the relation to be + * clustered. It's basically the data needed by index_create() so + * we can recreate the indexes after destroying the old heap. + */ + typedef struct + { + char *indexName; + IndexInfo *indexInfo; + Oid accessMethodOID; + Oid *classOID; + Oid indexOID; + bool isPrimary; + } IndexAttrs; static Oid copy_heap(Oid OIDOldHeap, const char *NewName); static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex); + List *get_indexattr_list (Oid OIDOldHeap); + void recreate_indexattr(Oid OIDOldHeap, List *indexes); + void swap_relfilenodes(Oid r1, Oid r2); + + void + RelationClearRelation(Relation relation, bool rebuildIt); /* * cluster * * STILL TO DO: ! * Keep foreign keys, permissions and inheritance of the clustered table. ! * ! * We need to look at making use of the ability to write a new version of a ! * table (or index) under a new relfilenode value, without changing the ! * table's OID. */ void cluster(RangeVar *oldrelation, char *oldindexname) { Oid OIDOldHeap, OIDOldIndex, ! OIDNewHeap; Relation OldHeap, OldIndex; char NewHeapName[NAMEDATALEN]; ! List *indexes; ObjectAddress object; + /* The games with filenodes may not be rollbackable, so + * disallow running this inside a transaction block. + * This may be a false assumption though. + */ + if (IsTransactionBlock()) + elog (ERROR, "CLUSTER: may not be called inside a transaction block"); + /* ! * We grab exclusive access to the target relation for the * duration of the transaction. */ OldHeap = heap_openrv(oldrelation, AccessExclusiveLock); *************** *** 96,143 **** heap_close(OldHeap, NoLock); index_close(OldIndex); ! /* ! * Create the new heap with a temporary name. ! */ ! snprintf(NewHeapName, NAMEDATALEN, "temp_%u", OIDOldHeap); OIDNewHeap = copy_heap(OIDOldHeap, NewHeapName); /* We do not need CommandCounterIncrement() because copy_heap did it. */ ! /* ! * Copy the heap data into the new table in the desired order. ! */ rebuildheap(OIDNewHeap, OIDOldHeap, OIDOldIndex); /* To make the new heap's data visible. */ CommandCounterIncrement(); - /* Create new index over the tuples of the new heap. */ - snprintf(NewIndexName, NAMEDATALEN, "temp_%u", OIDOldIndex); ! OIDNewIndex = copy_index(OIDOldIndex, OIDNewHeap, NewIndexName); ! CommandCounterIncrement(); ! /* Destroy old heap (along with its index) and rename new. */ object.classId = RelOid_pg_class; ! object.objectId = OIDOldHeap; object.objectSubId = 0; /* XXX better to use DROP_CASCADE here? */ performDeletion(&object, DROP_RESTRICT); - /* performDeletion does CommandCounterIncrement at end */ - - renamerel(OIDNewHeap, oldrelation->relname); - - /* This one might be unnecessary, but let's be safe. */ - CommandCounterIncrement(); - - renamerel(OIDNewIndex, oldindexname); } static Oid copy_heap(Oid OIDOldHeap, const char *NewName) { --- 122,165 ---- heap_close(OldHeap, NoLock); index_close(OldIndex); ! /* Save the information of all indexes on the relation. */ ! indexes = get_indexattr_list(OIDOldHeap); + /* Create the new heap with a temporary name. */ + snprintf(NewHeapName, NAMEDATALEN, "temp_%u", OIDOldHeap); OIDNewHeap = copy_heap(OIDOldHeap, NewHeapName); /* We do not need CommandCounterIncrement() because copy_heap did it. */ ! /* Copy the heap data into the new table in the desired order. */ rebuildheap(OIDNewHeap, OIDOldHeap, OIDOldIndex); /* To make the new heap's data visible. */ CommandCounterIncrement(); ! /* Swap the relfilenodes of the old and new heaps. */ ! swap_relfilenodes(OIDNewHeap, OIDOldHeap); ! /* At this point, Old points to the new file, and new points to old */ ! ! /* Recreate the indexes on the relation. We do not need ! * CommandCounterIncrement() because recreate_indexattr does it. ! */ ! recreate_indexattr(OIDOldHeap, indexes); ! /* Destroy the new heap, carrying the old filenode along. */ object.classId = RelOid_pg_class; ! object.objectId = OIDNewHeap; object.objectSubId = 0; /* XXX better to use DROP_CASCADE here? */ performDeletion(&object, DROP_RESTRICT); /* performDeletion does CommandCounterIncrement at end */ } + /* Create and initialize the new heap + */ static Oid copy_heap(Oid OIDOldHeap, const char *NewName) { *************** *** 181,223 **** return OIDNewHeap; } ! static Oid ! copy_index(Oid OIDOldIndex, Oid OIDNewHeap, const char *NewIndexName) ! { ! Oid OIDNewIndex; ! Relation OldIndex, ! NewHeap; ! IndexInfo *indexInfo; ! ! NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock); ! OldIndex = index_open(OIDOldIndex); ! ! /* ! * Create a new index like the old one. To do this I get the info ! * from pg_index, and add a new index with a temporary name (that will ! * be changed later). ! */ ! indexInfo = BuildIndexInfo(OldIndex->rd_index); ! ! OIDNewIndex = index_create(OIDNewHeap, ! NewIndexName, ! indexInfo, ! OldIndex->rd_rel->relam, ! OldIndex->rd_index->indclass, ! OldIndex->rd_index->indisprimary, ! false, /* XXX losing constraint status */ ! allowSystemTableMods); ! ! setRelhasindex(OIDNewHeap, true, ! OldIndex->rd_index->indisprimary, InvalidOid); ! ! index_close(OldIndex); ! heap_close(NewHeap, NoLock); ! ! return OIDNewIndex; ! } ! ! static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) { --- 203,210 ---- return OIDNewHeap; } ! /* Load the data into the new heap, clustered. ! */ static void rebuildheap(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex) { *************** *** 260,263 **** --- 247,415 ---- index_close(LocalOldIndex); heap_close(LocalOldHeap, NoLock); heap_close(LocalNewHeap, NoLock); + } + + /* Get the necessary info about the indexes in the relation and + * return a List of IndexAttrs. + */ + List * + get_indexattr_list (Oid OIDOldHeap) + { + ScanKeyData entry; + HeapScanDesc scan; + Relation indexRelation; + HeapTuple indexTuple; + List *indexes = NIL; + IndexAttrs *attrs; + HeapTuple tuple; + Form_pg_index index; + + /* Grab the index tuples by looking into RelationRelationName + * by the OID of the old heap. + */ + indexRelation = heap_openr(IndexRelationName, AccessShareLock); + ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid, + F_OIDEQ, ObjectIdGetDatum(OIDOldHeap)); + scan = heap_beginscan(indexRelation, SnapshotNow, 1, &entry); + while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + index = (Form_pg_index) GETSTRUCT(indexTuple); + + attrs = (IndexAttrs *) palloc(sizeof(IndexAttrs)); + attrs->indexInfo = BuildIndexInfo(index); + attrs->isPrimary = index->indisprimary; + attrs->indexOID = index->indexrelid; + + /* The opclasses are copied verbatim from the original indexes. + */ + attrs->classOID = (Oid *)palloc(sizeof(Oid) * + attrs->indexInfo->ii_NumIndexAttrs); + memcpy(attrs->classOID, index->indclass, + sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs); + + /* Name and access method of each index come from + * RelationRelationName. + */ + tuple = SearchSysCache(RELOID, + ObjectIdGetDatum(attrs->indexOID), + 0, 0, 0); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, "CLUSTER: cannot find index %u", attrs->indexOID); + attrs->indexName = pstrdup(NameStr(((Form_pg_class) GETSTRUCT(tuple))->relname)); + attrs->accessMethodOID = ((Form_pg_class) GETSTRUCT(tuple))->relam; + ReleaseSysCache(tuple); + + /* Cons the gathered data into the list. We do not care about + * ordering, and this is more efficient than append. + */ + indexes=lcons((void *)attrs, indexes); + } + heap_endscan(scan); + heap_close(indexRelation, NoLock); + return indexes; + } + + /* Create new indexes and swap the filenodes with old indexes. Then drop + * the new index (carrying the old heap along). + */ + void + recreate_indexattr(Oid OIDOldHeap, List *indexes) + { + IndexAttrs *attrs; + List *elem; + Oid newIndexOID; + char newIndexName[NAMEDATALEN]; + + foreach (elem, indexes) + { + attrs=(IndexAttrs *) lfirst(elem); + + /* Create the new index under a temporary name */ + snprintf(newIndexName, NAMEDATALEN, "temp_%u", attrs->indexOID); + newIndexOID = index_create(OIDOldHeap, newIndexName, + attrs->indexInfo, attrs->accessMethodOID, + attrs->classOID, attrs->isPrimary, + false, /* XXX losing constraint status */ + allowSystemTableMods); + CommandCounterIncrement(); + + /* Swap the filenodes. */ + swap_relfilenodes(newIndexOID, attrs->indexOID); + setRelhasindex(OIDOldHeap, true, attrs->isPrimary, InvalidOid); + + /* I'm not sure this one is needed, but let's be safe. */ + CommandCounterIncrement(); + + /* Drop the new index, carrying the old filenode along. */ + index_drop(newIndexOID); + CommandCounterIncrement(); + + pfree(attrs->classOID); + pfree(attrs); + } + freeList(indexes); + } + + /* Swap the relfilenodes for two given relations. + */ + void + swap_relfilenodes(Oid r1, Oid r2) + { + /* I can probably keep RelationRelationName open in the main + * function and pass the Relation around so I don't have to open + * it avery time. + */ + Relation relRelation, + irels[Num_pg_class_indices]; + HeapTuple reltup[2]; + Oid tempRFNode; + + RelationClearRelation(RelationIdGetRelation(r1), false); + // RelationClearRelation(RelationIdGetRelation(r2), false); + + CommandCounterIncrement(); + /* We need both RelationRelationName tuples. */ + relRelation = heap_openr(RelationRelationName, RowExclusiveLock); + + reltup[0] = SearchSysCacheCopy(RELOID, + ObjectIdGetDatum(r1), + 0, 0, 0); + if (!HeapTupleIsValid(reltup[0])) + elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r1); + reltup[1] = SearchSysCacheCopy(RELOID, + ObjectIdGetDatum(r2), + 0, 0, 0); + if (!HeapTupleIsValid(reltup[1])) + elog(ERROR, "CLUSTER: Cannot find tuple for relation %u", r2); + + /* Swap the filenodes */ + tempRFNode = ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode; + ((Form_pg_class) GETSTRUCT(reltup[0]))->relfilenode = + ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode; + ((Form_pg_class) GETSTRUCT(reltup[1]))->relfilenode = tempRFNode; + + /* Update the RelationRelationName tuples */ + simple_heap_update(relRelation, &reltup[0]->t_self, reltup[0]); + simple_heap_update(relRelation, &reltup[1]->t_self, reltup[1]); + + /* Keep system catalogs current */ + CatalogOpenIndices(Num_pg_class_indices, Name_pg_class_indices, irels); + CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[0]); + CatalogIndexInsert(irels, Num_pg_class_indices, relRelation, reltup[1]); + CatalogCloseIndices(Num_pg_class_indices, irels); + + heap_close(relRelation, RowExclusiveLock); + heap_freetuple(reltup[0]); + heap_freetuple(reltup[1]); + + CommandCounterIncrement(); + + // bjm + // RelationIdInvalidateRelationCacheByRelationId(r1); + // RelationIdInvalidateRelationCacheByRelationId(r2); + + RelationClearRelation(RelationIdGetRelation(r1), true); + // RelationClearRelation(RelationIdGetRelation(r2), true); + + CommandCounterIncrement(); } Index: src/backend/utils/cache/relcache.c =================================================================== RCS file: /cvsroot/pgsql/src/backend/utils/cache/relcache.c,v retrieving revision 1.166 diff -c -r1.166 relcache.c *** src/backend/utils/cache/relcache.c 12 Jul 2002 18:43:18 -0000 1.166 --- src/backend/utils/cache/relcache.c 14 Jul 2002 06:38:55 -0000 *************** *** 238,249 **** (void *)&(RELATION->rd_id), \ HASH_REMOVE, NULL); \ if (idhentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc that does not exist."); \ nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \ (void *)&(RELATION->rd_node), \ HASH_REMOVE, NULL); \ if (nodentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc that does not exist."); \ if (IsSystemNamespace(RelationGetNamespace(RELATION))) \ { \ char *relname = RelationGetRelationName(RELATION); \ --- 238,249 ---- (void *)&(RELATION->rd_id), \ HASH_REMOVE, NULL); \ if (idhentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc (rd_id) that does not exist."); \ nodentry = (RelNodeCacheEnt*)hash_search(RelationNodeCache, \ (void *)&(RELATION->rd_node), \ HASH_REMOVE, NULL); \ if (nodentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc (rd_node) that does not exist."); \ if (IsSystemNamespace(RelationGetNamespace(RELATION))) \ { \ char *relname = RelationGetRelationName(RELATION); \ *************** *** 252,258 **** relname, \ HASH_REMOVE, NULL); \ if (namehentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc that does not exist."); \ } \ } while(0) --- 252,258 ---- relname, \ HASH_REMOVE, NULL); \ if (namehentry == NULL) \ ! elog(WARNING, "trying to delete a reldesc (relname) that does not exist."); \ } \ } while(0) *************** *** 276,282 **** /* non-export function prototypes */ ! static void RelationClearRelation(Relation relation, bool rebuildIt); #ifdef ENABLE_REINDEX_NAILED_RELATIONS static void RelationReloadClassinfo(Relation relation); --- 276,282 ---- /* non-export function prototypes */ ! //static void RelationClearRelation(Relation relation, bool rebuildIt); #ifdef ENABLE_REINDEX_NAILED_RELATIONS static void RelationReloadClassinfo(Relation relation); *************** *** 683,689 **** */ rewrite_desc = heap_openr(RewriteRelationName, AccessShareLock); rewrite_tupdesc = RelationGetDescr(rewrite_desc); ! rewrite_scan = systable_beginscan(rewrite_desc, RewriteRelRulenameIndex, true, SnapshotNow, 1, &key); --- 683,689 ---- */ rewrite_desc = heap_openr(RewriteRelationName, AccessShareLock); rewrite_tupdesc = RelationGetDescr(rewrite_desc); ! rewrite_scan = systable_beginscan(rewrite_desc, RewriteRelRulenameIndex, true, SnapshotNow, 1, &key); *************** *** 1651,1657 **** * (one with refcount > 0). However, this routine just does whichever * it's told to do; callers must determine which they want. */ ! static void RelationClearRelation(Relation relation, bool rebuildIt) { MemoryContext oldcxt; --- 1651,1658 ---- * (one with refcount > 0). However, this routine just does whichever * it's told to do; callers must determine which they want. */ ! //bjm ! void RelationClearRelation(Relation relation, bool rebuildIt) { MemoryContext oldcxt;