From: | Greg Stark <gsstark(at)mit(dot)edu> |
---|---|
To: | pgsql-patches(at)postgresql(dot)org |
Subject: | ALTER TABLE ADD/DROP INHERITS |
Date: | 2006-06-07 18:14:44 |
Message-ID: | 87r720omsr.fsf@stark.xeocode.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-patches |
As described on -hackers this is my work so far adding ADD/DROP INHERITS. It
implements the controversial "ALTER TABLE <table> ADD/DROP INHERITS <parent>"
syntax that requires making INHERITS a reserved keyword. I haven't seen a
clear consensus yet on what the best syntax to use here would be.
Also, it doesn't handle default column values yet.
Other than that I think it's complete. There are a number of things I'm not
completely certain I'm on the right track with though so it can certainly use
some more eyeballs on it.
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.184
diff -u -p -c -r1.184 tablecmds.c
cvs diff: conflicting specifications of output style
*** src/backend/commands/tablecmds.c 10 May 2006 23:18:39 -0000 1.184
--- src/backend/commands/tablecmds.c 7 Jun 2006 18:09:56 -0000
*************** typedef struct NewColumnValue
*** 159,166 ****
--- 159,168 ----
static void truncate_check_rel(Relation rel);
static List *MergeAttributes(List *schema, List *supers, bool istemp,
List **supOids, List **supconstr, int *supOidCount);
+ static void MergeAttributesIntoExisting(Relation rel, Relation relation);
static bool change_varattnos_of_a_node(Node *node, const AttrNumber *newattno);
static void StoreCatalogInheritance(Oid relationId, List *supers);
+ static void StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, Relation catalogRelation);
static int findAttrByName(const char *attributeName, List *schema);
static void setRelhassubclassInRelation(Oid relationId, bool relhassubclass);
static bool needs_toast_table(Relation rel);
*************** static void ATPrepSetTableSpace(AlteredT
*** 246,251 ****
--- 248,255 ----
static void ATExecSetTableSpace(Oid tableOid, Oid newTableSpace);
static void ATExecEnableDisableTrigger(Relation rel, char *trigname,
bool enable, bool skip_system);
+ static void ATExecAddInherits(Relation rel, RangeVar *parent);
+ static void ATExecDropInherits(Relation rel, RangeVar *parent);
static void copy_relation_data(Relation rel, SMgrRelation dst);
static void update_ri_trigger_args(Oid relid,
const char *oldname,
*************** static void
*** 1156,1165 ****
StoreCatalogInheritance(Oid relationId, List *supers)
{
Relation relation;
- TupleDesc desc;
int16 seqNumber;
ListCell *entry;
- HeapTuple tuple;
/*
* sanity checks
--- 1160,1167 ----
*************** StoreCatalogInheritance(Oid relationId,
*** 1179,1194 ****
* anymore, there's no need to look for indirect ancestors.)
*/
relation = heap_open(InheritsRelationId, RowExclusiveLock);
- desc = RelationGetDescr(relation);
seqNumber = 1;
foreach(entry, supers)
{
! Oid parentOid = lfirst_oid(entry);
Datum datum[Natts_pg_inherits];
char nullarr[Natts_pg_inherits];
ObjectAddress childobject,
parentobject;
datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
--- 1181,1206 ----
* anymore, there's no need to look for indirect ancestors.)
*/
relation = heap_open(InheritsRelationId, RowExclusiveLock);
seqNumber = 1;
foreach(entry, supers)
{
! StoreCatalogInheritance1(relationId, lfirst_oid(entry), seqNumber, relation);
! seqNumber += 1;
! }
!
! heap_close(relation, RowExclusiveLock);
! }
!
! static void
! StoreCatalogInheritance1(Oid relationId, Oid parentOid, int16 seqNumber, Relation relation)
! {
Datum datum[Natts_pg_inherits];
char nullarr[Natts_pg_inherits];
ObjectAddress childobject,
parentobject;
+ HeapTuple tuple;
+ TupleDesc desc = RelationGetDescr(relation);
datum[0] = ObjectIdGetDatum(relationId); /* inhrel */
datum[1] = ObjectIdGetDatum(parentOid); /* inhparent */
*************** StoreCatalogInheritance(Oid relationId,
*** 1222,1234 ****
* Mark the parent as having subclasses.
*/
setRelhassubclassInRelation(parentOid, true);
- seqNumber += 1;
- }
- heap_close(relation, RowExclusiveLock);
}
/*
* Look for an existing schema entry with the given name.
*
--- 1234,1246 ----
* Mark the parent as having subclasses.
*/
setRelhassubclassInRelation(parentOid, true);
+
}
+
+
/*
* Look for an existing schema entry with the given name.
*
*************** ATPrepCmd(List **wqueue, Relation rel, A
*** 2053,2063 ****
--- 2065,2078 ----
case AT_DisableTrig: /* DISABLE TRIGGER variants */
case AT_DisableTrigAll:
case AT_DisableTrigUser:
+ case AT_AddInherits:
+ case AT_DropInherits:
ATSimplePermissions(rel, false);
/* These commands never recurse */
/* No command-specific prep needed */
pass = AT_PASS_MISC;
break;
+
default: /* oops */
elog(ERROR, "unrecognized alter table type: %d",
(int) cmd->subtype);
*************** ATExecCmd(AlteredTableInfo *tab, Relatio
*** 2233,2238 ****
--- 2248,2259 ----
case AT_DisableTrigUser: /* DISABLE TRIGGER USER */
ATExecEnableDisableTrigger(rel, NULL, false, true);
break;
+ case AT_DropInherits:
+ ATExecDropInherits(rel, cmd->parent);
+ break;
+ case AT_AddInherits:
+ ATExecAddInherits(rel, cmd->parent);
+ break;
default: /* oops */
elog(ERROR, "unrecognized alter table type: %d",
(int) cmd->subtype);
*************** ATExecEnableDisableTrigger(Relation rel,
*** 5880,5885 ****
--- 5901,6198 ----
EnableDisableTrigger(rel, trigname, enable, skip_system);
}
+ static void
+ ATExecAddInherits(Relation rel, RangeVar *parent)
+ {
+ Relation relation, catalogRelation;
+ SysScanDesc scan;
+ ScanKeyData key;
+ HeapTuple inheritsTuple;
+ int4 inhseqno = 0;
+ ListCell *child;
+ List *children;
+
+ relation = heap_openrv(parent, AccessShareLock); /* XXX is this enough locking? */
+ if (relation->rd_rel->relkind != RELKIND_RELATION)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("inherited relation \"%s\" is not a table",
+ parent->relname)));
+
+
+ /* Permanent rels cannot inherit from temporary ones */
+ if (!rel->rd_istemp && isTempNamespace(RelationGetNamespace(relation)))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot inherit from temporary relation \"%s\"",
+ parent->relname)));
+
+ if (!pg_class_ownercheck(RelationGetRelid(relation), GetUserId()))
+ aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
+ RelationGetRelationName(relation));
+
+ /* If parent has OIDs then all children must have OIDs */
+ if (relation->rd_rel->relhasoids && !rel->rd_rel->relhasoids)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("table \"%s\" without OIDs cannot inherit from table \"%s\" with OIDs",
+ RelationGetRelationName(rel), parent->relname)));
+
+ /*
+ * Reject duplications in the list of parents. -- this is the same check as
+ * when creating a table, but maybe we should check for the parent anywhere
+ * higher in the inheritance structure?
+ */
+ catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+ ScanKeyInit(&key,
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, &key);
+ while (HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
+ {
+ Form_pg_inherits inh = (Form_pg_inherits) GETSTRUCT(inheritsTuple);
+ if (inh->inhparent == RelationGetRelid(relation))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_TABLE),
+ errmsg("inherited relation \"%s\" duplicated",
+ parent->relname)));
+ if (inh->inhseqno > inhseqno)
+ inhseqno = inh->inhseqno;
+ }
+ systable_endscan(scan);
+ heap_close(catalogRelation, RowExclusiveLock);
+
+ /* Get children because we have to manually recurse and also because we
+ * have to check for recursive inheritance graphs */
+
+ /* this routine is actually in the planner */
+ children = find_all_inheritors(RelationGetRelid(rel));
+
+ if (list_member_oid(children, RelationGetRelid(relation)))
+ ereport(ERROR,
+ (errcode(ERRCODE_DUPLICATE_TABLE),
+ errmsg("Circular inheritance structure found")));
+
+ foreach(child, children)
+ {
+ Oid childrelid = lfirst_oid(child);
+ Relation childrel;
+
+ childrel = relation_open(childrelid, AccessExclusiveLock);
+ MergeAttributesIntoExisting(childrel, relation);
+ relation_close(childrel, NoLock);
+ }
+
+ catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+ StoreCatalogInheritance1(RelationGetRelid(rel), RelationGetRelid(relation), inhseqno+1, catalogRelation);
+ heap_close(catalogRelation, RowExclusiveLock);
+
+ heap_close(relation, AccessShareLock);
+ }
+
+
+ static void
+ MergeAttributesIntoExisting(Relation rel, Relation relation)
+ {
+ Relation attrdesc;
+ AttrNumber parent_attno, child_attno;
+ TupleDesc tupleDesc;
+ TupleConstr *constr;
+ HeapTuple tuple;
+
+ child_attno = RelationGetNumberOfAttributes(rel);
+
+ tupleDesc = RelationGetDescr(relation);
+ constr = tupleDesc->constr;
+
+ for (parent_attno = 1; parent_attno <= tupleDesc->natts;
+ parent_attno++)
+ {
+ Form_pg_attribute attribute = tupleDesc->attrs[parent_attno - 1];
+ char *attributeName = NameStr(attribute->attname);
+
+ /* Ignore dropped columns in the parent. */
+ if (attribute->attisdropped)
+ continue;
+
+ /* Does it conflict with an existing column? */
+ attrdesc = heap_open(AttributeRelationId, RowExclusiveLock);
+
+ tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), attributeName);
+ if (HeapTupleIsValid(tuple)) {
+ /*
+ * Yes, try to merge the two column definitions. They must
+ * have the same type and typmod.
+ */
+ Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(tuple);
+ ereport(NOTICE,
+ (errmsg("merging column \"%s\" with inherited definition",
+ attributeName)));
+ if (attribute->atttypid != childatt->atttypid ||
+ attribute->atttypmod != childatt->atttypmod ||
+ (attribute->attnotnull && !childatt->attnotnull))
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("child table \"%s\" has different type for column \"%s\"",
+ RelationGetRelationName(rel), NameStr(attribute->attname))));
+
+ childatt->attinhcount++;
+ simple_heap_update(attrdesc, &tuple->t_self, tuple);
+ CatalogUpdateIndexes(attrdesc, tuple); /* XXX strength reduce openindexes to outside loop? */
+ heap_freetuple(tuple);
+
+ /* XXX defaults */
+
+ } else {
+ /*
+ * No, create a new inherited column
+ */
+
+ FormData_pg_attribute attributeD;
+ HeapTuple attributeTuple = heap_addheader(Natts_pg_attribute,
+ false,
+ ATTRIBUTE_TUPLE_SIZE,
+ (void *) &attributeD);
+ Form_pg_attribute childatt = (Form_pg_attribute) GETSTRUCT(attributeTuple);
+
+ if (attribute->attnotnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_DATATYPE_MISMATCH),
+ errmsg("Cannot add new inherited NOT NULL column \"%s\"",
+ NameStr(attribute->attname))));
+
+ childatt->attrelid = RelationGetRelid(rel);
+ namecpy(&childatt->attname, &attribute->attname);
+ childatt->atttypid = attribute->atttypid;
+ childatt->attstattarget = -1;
+ childatt->attlen = attribute->attlen;
+ childatt->attcacheoff = -1;
+ childatt->atttypmod = attribute->atttypmod;
+ childatt->attnum = ++child_attno;
+ childatt->attbyval = attribute->attbyval;
+ childatt->attndims = attribute->attndims;
+ childatt->attstorage = attribute->attstorage;
+ childatt->attalign = attribute->attalign;
+ childatt->attnotnull = false;
+ childatt->atthasdef = false; /* XXX */
+ childatt->attisdropped = false;
+ childatt->attislocal = false;
+ childatt->attinhcount = attribute->attinhcount+1;
+
+ simple_heap_insert(attrdesc, attributeTuple);
+ CatalogUpdateIndexes(attrdesc, attributeTuple);
+ heap_freetuple(attributeTuple);
+
+ /* XXX Defaults */
+
+ }
+ heap_close(attrdesc, RowExclusiveLock);
+ }
+
+ }
+
+ static void
+ ATExecDropInherits(Relation rel, RangeVar *parent)
+ {
+
+
+ Relation catalogRelation;
+ SysScanDesc scan;
+ ScanKeyData key;
+ HeapTuple inheritsTuple, attributeTuple;
+ Oid inhparent;
+ Oid dropparent;
+ int found = 0;
+
+ /* Get the OID of parent -- if no schema is specified use the regular
+ * search path and only drop the one table that's found. We could try to be
+ * clever and look at each parent and see if it matches but that would be
+ * inconsistent with other operations I think. */
+
+ Assert(rel);
+ Assert(parent);
+
+ dropparent = RangeVarGetRelid(parent, false);
+
+ /* Search through the direct parents of rel looking for dropparent oid */
+
+ catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
+ ScanKeyInit(&key,
+ Anum_pg_inherits_inhrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true, SnapshotNow, 1, &key);
+ while (!found && HeapTupleIsValid(inheritsTuple = systable_getnext(scan)))
+ {
+ inhparent = ((Form_pg_inherits) GETSTRUCT(inheritsTuple))->inhparent;
+ if (inhparent == dropparent) {
+ simple_heap_delete(catalogRelation, &inheritsTuple->t_self);
+ found = 1;
+ }
+ }
+ systable_endscan(scan);
+ heap_close(catalogRelation, RowExclusiveLock);
+
+
+ if (!found) {
+ /* would it be better to look up the actual schema of dropparent and
+ * make the error message explicitly name the qualified name it's
+ * trying to drop ?*/
+ if (parent->schemaname)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("relation \"%s.%s\" is not a parent of relation \"%s\"",
+ parent->schemaname, parent->relname, RelationGetRelationName(rel))));
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_TABLE),
+ errmsg("relation \"%s\" is not a parent of relation \"%s\"",
+ parent->relname, RelationGetRelationName(rel))));
+ }
+
+ /* Search through columns looking for matching columns from parent table */
+
+ catalogRelation = heap_open(AttributeRelationId, RowExclusiveLock);
+ ScanKeyInit(&key,
+ Anum_pg_attribute_attrelid,
+ BTEqualStrategyNumber, F_OIDEQ,
+ ObjectIdGetDatum(RelationGetRelid(rel)));
+ scan = systable_beginscan(catalogRelation, AttributeRelidNumIndexId, true, SnapshotNow, 1, &key);
+ while (HeapTupleIsValid(attributeTuple = systable_getnext(scan))) {
+ Form_pg_attribute att = ((Form_pg_attribute)GETSTRUCT(attributeTuple));
+ /* Not an inherited column at all
+ * (do NOT use islocal for this test--it can be true for inherited columns)
+ */
+ if (att->attinhcount == 0)
+ continue;
+ if (att->attisdropped) /* XXX Is this right? */
+ continue;
+ if (SearchSysCacheExistsAttName(dropparent, NameStr(att->attname))) {
+ /* Decrement inhcount and possibly set islocal to 1 */
+ HeapTuple copyTuple = heap_copytuple(attributeTuple);
+ Form_pg_attribute copy_att = ((Form_pg_attribute)GETSTRUCT(copyTuple));
+
+ copy_att->attinhcount--;
+ if (copy_att->attinhcount == 0)
+ copy_att->attislocal = 1;
+
+ simple_heap_update(catalogRelation, ©Tuple->t_self, copyTuple);
+ /* XXX "Avoid using it for multiple tuples, since opening the
+ * indexes and building the index info structures is moderately
+ * expensive." Perhaps this can be moved outside the loop or else
+ * at least the CatalogOpenIndexes/CatalogCloseIndexes moved
+ * outside the loop but when I try that it seg faults?!*/
+ CatalogUpdateIndexes(catalogRelation, copyTuple);
+ heap_freetuple(copyTuple);
+ }
+ }
+ systable_endscan(scan);
+ heap_close(catalogRelation, RowExclusiveLock);
+ }
+
+
+
/*
* ALTER TABLE CREATE TOAST TABLE
*
Index: src/backend/nodes/copyfuncs.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/nodes/copyfuncs.c,v
retrieving revision 1.335
diff -u -p -c -r1.335 copyfuncs.c
cvs diff: conflicting specifications of output style
*** src/backend/nodes/copyfuncs.c 30 Apr 2006 18:30:38 -0000 1.335
--- src/backend/nodes/copyfuncs.c 7 Jun 2006 18:09:56 -0000
*************** _copyAlterTableCmd(AlterTableCmd *from)
*** 1799,1804 ****
--- 1799,1805 ----
COPY_SCALAR_FIELD(subtype);
COPY_STRING_FIELD(name);
COPY_NODE_FIELD(def);
+ COPY_NODE_FIELD(parent);
COPY_NODE_FIELD(transform);
COPY_SCALAR_FIELD(behavior);
Index: src/backend/parser/gram.y
===================================================================
RCS file: /projects/cvsroot/pgsql/src/backend/parser/gram.y,v
retrieving revision 2.545
diff -u -p -c -r2.545 gram.y
cvs diff: conflicting specifications of output style
*** src/backend/parser/gram.y 27 May 2006 17:38:45 -0000 2.545
--- src/backend/parser/gram.y 7 Jun 2006 18:09:57 -0000
*************** alter_table_cmd:
*** 1514,1519 ****
--- 1514,1535 ----
n->subtype = AT_DisableTrigUser;
$$ = (Node *)n;
}
+ /* ALTER TABLE <name> ADD INHERITS <parent> */
+ | ADD_P INHERITS qualified_name
+ {
+ AlterTableCmd *n = makeNode(AlterTableCmd);
+ n->subtype = AT_AddInherits;
+ n->parent = $3;
+ $$ = (Node *)n;
+ }
+ /* ALTER TABLE <name> DROP INHERITS <parent> */
+ | DROP INHERITS qualified_name
+ {
+ AlterTableCmd *n = makeNode(AlterTableCmd);
+ n->subtype = AT_DropInherits;
+ n->parent = $3;
+ $$ = (Node *)n;
+ }
| alter_rel_cmd
{
$$ = $1;
*************** unreserved_keyword:
*** 8422,8428 ****
| INCREMENT
| INDEX
| INHERIT
- | INHERITS
| INPUT_P
| INSENSITIVE
| INSERT
--- 8438,8443 ----
*************** func_name_keyword:
*** 8640,8645 ****
--- 8655,8661 ----
*/
reserved_keyword:
ALL
+ | INHERITS
| ANALYSE
| ANALYZE
| AND
Index: src/include/nodes/parsenodes.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/include/nodes/parsenodes.h,v
retrieving revision 1.310
diff -u -p -c -r1.310 parsenodes.h
cvs diff: conflicting specifications of output style
*** src/include/nodes/parsenodes.h 30 Apr 2006 18:30:40 -0000 1.310
--- src/include/nodes/parsenodes.h 7 Jun 2006 18:10:00 -0000
*************** typedef enum AlterTableType
*** 874,880 ****
AT_EnableTrigAll, /* ENABLE TRIGGER ALL */
AT_DisableTrigAll, /* DISABLE TRIGGER ALL */
AT_EnableTrigUser, /* ENABLE TRIGGER USER */
! AT_DisableTrigUser /* DISABLE TRIGGER USER */
} AlterTableType;
typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */
--- 874,882 ----
AT_EnableTrigAll, /* ENABLE TRIGGER ALL */
AT_DisableTrigAll, /* DISABLE TRIGGER ALL */
AT_EnableTrigUser, /* ENABLE TRIGGER USER */
! AT_DisableTrigUser, /* DISABLE TRIGGER USER */
! AT_AddInherits, /* ADD INHERITS parent */
! AT_DropInherits /* DROP INHERITS parent */
} AlterTableType;
typedef struct AlterTableCmd /* one subcommand of an ALTER TABLE */
*************** typedef struct AlterTableCmd /* one subc
*** 883,888 ****
--- 885,891 ----
AlterTableType subtype; /* Type of table alteration to apply */
char *name; /* column, constraint, or trigger to act on,
* or new owner or tablespace */
+ RangeVar *parent; /* Parent table for add/drop inherits */
Node *def; /* definition of new column, column type,
* index, or constraint */
Node *transform; /* transformation expr for ALTER TYPE */
--
greg
From | Date | Subject | |
---|---|---|---|
Next Message | Andrew Dunstan | 2006-06-07 18:47:27 | Re: ALTER TABLE ADD/DROP INHERITS |
Previous Message | Jim C. Nasby | 2006-06-07 17:19:12 | Re: remove lock protection on HeapTupleSatisfiesVacuum |