diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index 721d24783b..1ea4003aec 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -10634,7 +10634,7 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel) * Because we're only expanding the key space at the referenced side, * we don't need to prevent any operation in the referencing table, so * AccessShareLock suffices (assumes that dropping the constraint - * acquires AEL). + * acquires AccessExclusiveLock). */ fkRel = table_open(constrForm->conrelid, AccessShareLock); @@ -19175,8 +19175,10 @@ DetachPartitionFinalize(Relation rel, Relation partRel, bool concurrent, DropClonedTriggersFromPartition(RelationGetRelid(partRel)); /* - * Detach any foreign keys that are inherited. This includes creating - * additional action triggers. + * Detach any foreign keys that are inherited. This requires marking some + * constraints and triggers as no longer having parents, as well as + * creating additional constraint entries and triggers, depending on the + * shape of each FK. */ fks = copyObject(RelationGetFKeyList(partRel)); if (fks != NIL) @@ -19185,10 +19187,15 @@ DetachPartitionFinalize(Relation rel, Relation partRel, bool concurrent, { ForeignKeyCacheInfo *fk = lfirst(cell); HeapTuple contup; + HeapTuple parentConTup; Form_pg_constraint conform; + Form_pg_constraint parentConForm; Constraint *fkconstraint; - Oid insertTriggerOid, - updateTriggerOid; + Oid parentConstrOid; + Oid insertCheckTrigOid, + updateCheckTrigOid, + updateActionTrigOid, + deleteActionTrigOid; contup = SearchSysCache1(CONSTROID, ObjectIdGetDatum(fk->conoid)); if (!HeapTupleIsValid(contup)) @@ -19203,28 +19210,34 @@ DetachPartitionFinalize(Relation rel, Relation partRel, bool concurrent, continue; } - /* unset conparentid and adjust conislocal, coninhcount, etc. */ - ConstraintSetParentConstraint(fk->conoid, InvalidOid, InvalidOid); - /* - * Also, look up the partition's "check" triggers corresponding to the - * constraint being detached and detach them from the parent triggers. + * To create a FK constraint in the to-be-detached partition + * equivalent to what exists in the parent table, what to do depends + * on whether the referenced table is partitioned or not. If it + * isn't, then just reparenting the pg_constraint row and pg_trigger + * rows for the existing constraint, and creating new action triggers, + * is sufficient. However, if the referenced table is partitioned, + * then we must create additional pg_constraint rows -- one for each + * partition -- and the necessary action triggers for each of them. */ - GetForeignKeyCheckTriggers(trigrel, - fk->conoid, fk->confrelid, fk->conrelid, - &insertTriggerOid, &updateTriggerOid); - Assert(OidIsValid(insertTriggerOid)); - TriggerSetParentTrigger(trigrel, insertTriggerOid, InvalidOid, - RelationGetRelid(partRel)); - Assert(OidIsValid(updateTriggerOid)); - TriggerSetParentTrigger(trigrel, updateTriggerOid, InvalidOid, - RelationGetRelid(partRel)); + parentConstrOid = conform->conparentid; - /* - * Make the action triggers on the referenced relation. When this was - * a partition the action triggers pointed to the parent rel (they - * still do), but now we need separate ones of our own. - */ + Assert(OidIsValid(conform->conparentid)); + parentConTup = SearchSysCache1(CONSTROID, + ObjectIdGetDatum(parentConstrOid)); + if (!HeapTupleIsValid(parentConTup)) + elog(ERROR, "cache lookup failed for constraint %u", + conform->conparentid); + parentConForm = (Form_pg_constraint) GETSTRUCT(parentConTup); + + if (parentConForm->confrelid == RelationGetRelid(rel)) + { + elog(WARNING, "detaching self-referencing FK %u (parent %u) from %s to %s", + conform->oid, conform->conparentid, + RelationGetRelationName(partRel), RelationGetRelationName(rel)); + } + + /* Create a node we'll use throughout this */ fkconstraint = makeNode(Constraint); fkconstraint->contype = CONSTRAINT_FOREIGN; fkconstraint->conname = pstrdup(NameStr(conform->conname)); @@ -19240,15 +19253,178 @@ DetachPartitionFinalize(Relation rel, Relation partRel, bool concurrent, fkconstraint->fk_del_set_cols = NIL; fkconstraint->old_conpfeqop = NIL; fkconstraint->old_pktable_oid = InvalidOid; - fkconstraint->skip_validation = false; + fkconstraint->skip_validation = true; /* XXX hmmm */ fkconstraint->initially_valid = true; + /* + * The constraint on this table must be marked no longer a child of + * the parent's constraint; and its check triggers too. + */ + ConstraintSetParentConstraint(fk->conoid, InvalidOid, InvalidOid); + + GetForeignKeyCheckTriggers(trigrel, + fk->conoid, fk->confrelid, fk->conrelid, + &insertCheckTrigOid, &updateCheckTrigOid); + Assert(OidIsValid(insertCheckTrigOid)); + TriggerSetParentTrigger(trigrel, insertCheckTrigOid, InvalidOid, + RelationGetRelid(partRel)); + Assert(OidIsValid(updateCheckTrigOid)); + TriggerSetParentTrigger(trigrel, updateCheckTrigOid, InvalidOid, + RelationGetRelid(partRel)); + + /* + * Now create the action triggers in this table that point to the + * referenced table. + */ createForeignKeyActionTriggers(partRel, conform->confrelid, fkconstraint, fk->conoid, conform->conindid, InvalidOid, InvalidOid, - NULL, NULL); + &deleteActionTrigOid, &updateActionTrigOid); + /* + * If the referenced side is partitioned (which we know because our + * parent's constraint points to a different relation than ours) then + * we must, in addition to the above, create pg_constraint rows that + * point to each partition, each with its own action triggers. + */ + if (parentConForm->conrelid != conform->conrelid) + { + List *children; + ListCell *child; + Relation refdRel; + Oid indexOid; + int numfks; + AttrNumber conkey[INDEX_MAX_KEYS]; + AttrNumber confkey[INDEX_MAX_KEYS]; + Oid conpfeqop[INDEX_MAX_KEYS]; + Oid conppeqop[INDEX_MAX_KEYS]; + Oid conffeqop[INDEX_MAX_KEYS]; + int numfkdelsetcols; + AttrNumber confdelsetcols[INDEX_MAX_KEYS]; + List *colnames = NIL; + char *fkaddition; + + DeconstructFkConstraintRow(contup, + &numfks, + conkey, + confkey, + conpfeqop, + conppeqop, + conffeqop, + &numfkdelsetcols, + confdelsetcols); + + for (int i = 0; i < numfks; i++) + { + Form_pg_attribute att; + + att = TupleDescAttr(RelationGetDescr(partRel), + conkey[i] - 1); + colnames = lappend(colnames, makeString(NameStr(att->attname))); + } + fkaddition = ChooseForeignKeyConstraintNameAddition(colnames); + + /* + * We're not expanding nor shrinking key space, so AccessShareLock + * is sufficient here given that dropping a constraint requires + * AccessExclusiveLock. + */ + refdRel = table_open(fk->confrelid, AccessShareLock); + + /* + * When the referenced table is partitioned, we need new + * pg_constraint entries that point from our partition to each + * partition of the other table, and also the action triggers on + * each. + */ + children = find_all_inheritors(fk->confrelid, NoLock, NULL); + foreach(child, children) + { + Oid childoid = lfirst_oid(child); + Relation fchild; + AttrNumber mapped_confkey[INDEX_MAX_KEYS]; + AttrMap *attmap; + char *conname; + Oid constrOid; + ObjectAddress address, + referenced; + + /* A constraint for the topmost parent already exists */ + if (childoid == fk->confrelid) + continue; + + fchild = table_open(childoid, AccessShareLock); + + attmap = build_attrmap_by_name(RelationGetDescr(refdRel), + RelationGetDescr(fchild), + false); + for (int i = 0; i < numfks; i++) + mapped_confkey[i] = attmap->attnums[confkey[i] - 1]; + + conname = ChooseConstraintName(RelationGetRelationName(partRel), + fkaddition, "fkey", + RelationGetNamespace(partRel), NIL); + + indexOid = index_get_partition(fchild, conform->conindid); + + constrOid = + CreateConstraintEntry(conname, + RelationGetNamespace(partRel), + CONSTRAINT_FOREIGN, + fkconstraint->deferrable, + fkconstraint->initdeferred, + fkconstraint->initially_valid, + fk->conoid, + RelationGetRelid(partRel), + mapped_confkey, + numfks, + numfks, + InvalidOid, + indexOid, + childoid, + conkey, + conpfeqop, + conppeqop, + conffeqop, + numfks, + fkconstraint->fk_upd_action, + fkconstraint->fk_del_action, + confdelsetcols, + numfkdelsetcols, + fkconstraint->fk_matchtype, + NULL, + NULL, + NULL, + false, + 1, + false, + false); + + /* + * Give this constraint partition-type dependencies on the + * parent constraint as well as the table. + */ + ObjectAddressSet(address, ConstraintRelationId, constrOid); + ObjectAddressSet(referenced, ConstraintRelationId, fk->conoid); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_PRI); + ObjectAddressSet(referenced, RelationRelationId, fk->conrelid); + recordDependencyOn(&address, &referenced, DEPENDENCY_PARTITION_SEC); + + /* And now create the action triggers that go with it */ + createForeignKeyActionTriggers(partRel, childoid, + fkconstraint, + constrOid, indexOid, + deleteActionTrigOid, updateActionTrigOid, + NULL, NULL); + + table_close(fchild, NoLock); + } + + table_close(refdRel, NoLock); + } + + ReleaseSysCache(parentConTup); ReleaseSysCache(contup); } list_free_deep(fks);