From 8640a4605f32f8072d093902da0af23dfa6d119b Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 20 Nov 2024 16:34:04 +0800 Subject: [PATCH v10] combine functions --- src/backend/commands/publicationcmds.c | 219 ++++++++++--------------- src/backend/utils/cache/relcache.c | 38 ++--- src/include/commands/publicationcmds.h | 7 +- 3 files changed, 105 insertions(+), 159 deletions(-) diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 053877c524..0d5daf7626 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -336,21 +336,36 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, } /* - * Check if all columns referenced in the REPLICA IDENTITY are covered by - * the column list. + * Check for invalid columns in the publication table definition. * - * Returns true if any replica identity column is not covered by column list. + * This function evaluates two conditions: + * + * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered + * by the column list. If any column is missing, *invalid_column_list is set + * to true. + * + * 2. Ensures that the REPLICA IDENTITY does not contain unpublished generated + * columns. If an unpublished generated column is found, + * *unpublished_gen_col is set to true. + * + * Returns true if any of the above conditions are not met. */ bool -pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, - bool pubviaroot) +pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, + bool pubviaroot, bool pubgencols, + bool *invalid_column_list, + bool *unpublished_gen_col) { - HeapTuple tuple; Oid relid = RelationGetRelid(relation); Oid publish_as_relid = RelationGetRelid(relation); - bool result = false; - Datum datum; - bool isnull; + Bitmapset *idattrs; + Bitmapset *columns = NULL; + TupleDesc desc = RelationGetDescr(relation); + Publication *pub; + int x; + + *invalid_column_list = false; + *unpublished_gen_col = false; /* * For a partition, if pubviaroot is true, find the topmost ancestor that @@ -368,158 +383,90 @@ pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestor publish_as_relid = relid; } - tuple = SearchSysCache2(PUBLICATIONRELMAP, - ObjectIdGetDatum(publish_as_relid), - ObjectIdGetDatum(pubid)); - - if (!HeapTupleIsValid(tuple)) - return false; - - datum = SysCacheGetAttr(PUBLICATIONRELMAP, tuple, - Anum_pg_publication_rel_prattrs, - &isnull); + /* Fetch the column list */ + pub = GetPublication(pubid); + check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns); - if (!isnull) + if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) { - int x; - Bitmapset *idattrs; - Bitmapset *columns = NULL; - /* With REPLICA IDENTITY FULL, no column list is allowed. */ - if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - result = true; - - /* Transform the column list datum to a bitmapset. */ - columns = pub_collist_to_bitmapset(NULL, datum, NULL); - - /* Remember columns that are part of the REPLICA IDENTITY */ - idattrs = RelationGetIndexAttrBitmap(relation, - INDEX_ATTR_BITMAP_IDENTITY_KEY); + *invalid_column_list = (columns != NULL); /* - * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are - * offset (to handle system columns the usual way), while column list - * does not use offset, so we can't do bms_is_subset(). Instead, we - * have to loop over the idattrs and check all of them are in the - * list. + * REPLICA IDENTITY can be FULL only if there is no column list for + * publication. When REPLICA IDENTITY is FULL and the relation + * includes a generated column, but the publish_generated_columns + * option is set to false, this scenario is invalid. */ - x = -1; - while ((x = bms_next_member(idattrs, x)) >= 0) - { - AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber); - - /* - * If pubviaroot is true, we are validating the column list of the - * parent table, but the bitmap contains the replica identity - * information of the child table. The parent/child attnums may - * not match, so translate them to the parent - get the attname - * from the child, and look it up in the parent. - */ - if (pubviaroot) - { - /* attribute name in the child table */ - char *colname = get_attname(relid, attnum, false); - - /* - * Determine the attnum for the attribute name in parent (we - * are using the column list defined on the parent). - */ - attnum = get_attnum(publish_as_relid, colname); - } - - /* replica identity column, not covered by the column list */ - if (!bms_is_member(attnum, columns)) - { - result = true; - break; - } - } + if (!pubgencols && relation->rd_att->constr && + relation->rd_att->constr->has_generated_stored) + *unpublished_gen_col = true; - bms_free(idattrs); - bms_free(columns); + if (*unpublished_gen_col && *invalid_column_list) + return true; } - ReleaseSysCache(tuple); - - return result; -} - -/* - * Check if REPLICA IDENTITY consists of any unpublished generated column. - * - * Returns true if any replica identity column is an unpublished generated - * column. - */ -bool -replident_has_unpublished_gen_col(Oid pubid, Relation relation, List *ancestors, - bool pubviaroot) -{ - Oid relid = RelationGetRelid(relation); - Oid publish_as_relid = RelationGetRelid(relation); - bool result = false; - bool found; - Publication *pub; - - /* Return if the table does not contain any generated columns */ - if (!relation->rd_att->constr || - !relation->rd_att->constr->has_generated_stored) - return false; + /* Remember columns that are part of the REPLICA IDENTITY */ + idattrs = RelationGetIndexAttrBitmap(relation, + INDEX_ATTR_BITMAP_IDENTITY_KEY); /* - * For a partition, if pubviaroot is true, find the topmost ancestor that - * is published via this publication as we need to use its column list for - * the changes. - * - * Note that even though the column list used is for an ancestor, the - * REPLICA IDENTITY used will be for the actual child table. + * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset + * (to handle system columns the usual way), while column list does not + * use offset, so we can't do bms_is_subset(). Instead, we have to loop + * over the idattrs and check all of them are in the list. */ - if (pubviaroot && relation->rd_rel->relispartition) + x = -1; + while ((x = bms_next_member(idattrs, x)) >= 0) { - publish_as_relid = GetTopMostAncestorInPublication(pubid, ancestors, NULL); + AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber); + Form_pg_attribute att = TupleDescAttr(desc, attnum - 1); - if (!OidIsValid(publish_as_relid)) - publish_as_relid = relid; - } + Assert(!att->attisdropped); - pub = GetPublication(pubid); - found = check_and_fetch_column_list(pub, publish_as_relid, NULL, NULL); + /* Check if generated column is part of REPLICA IDENTITY */ + *unpublished_gen_col |= att->attgenerated; - if (!found) - { - TupleDesc desc = RelationGetDescr(relation); - Bitmapset *idattrs; - int x; + if (columns == NULL) + { + /* Break the loop if unpublished generated columns exist. */ + if (*unpublished_gen_col) + break; + + /* Skip validating the column list since it is not defined */ + continue; + } /* - * REPLICA IDENTITY can be FULL only if there is no column list for - * publication. If REPLICA IDENTITY is set as FULL and relation has a - * generated column we should error out. + * If pubviaroot is true, we are validating the column list of the + * parent table, but the bitmap contains the replica identity + * information of the child table. The parent/child attnums may not + * match, so translate them to the parent - get the attname from the + * child, and look it up in the parent. */ - if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) - return true; - - /* Remember columns that are part of the REPLICA IDENTITY */ - idattrs = RelationGetIndexAttrBitmap(relation, - INDEX_ATTR_BITMAP_IDENTITY_KEY); - - x = -1; - while ((x = bms_next_member(idattrs, x)) >= 0) + if (pubviaroot) { - AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber); - Form_pg_attribute att = TupleDescAttr(desc, attnum - 1); + /* attribute name in the child table */ + char *colname = get_attname(relid, attnum, false); - /* Check if generated column is part of REPLICA IDENTITY */ - if (!att->attisdropped && att->attgenerated) - { - result = true; - break; - } + /* + * Determine the attnum for the attribute name in parent (we are + * using the column list defined on the parent). + */ + attnum = get_attnum(publish_as_relid, colname); } - bms_free(idattrs); + /* replica identity column, not covered by the column list */ + *invalid_column_list |= !bms_is_member(attnum, columns); + + if (*invalid_column_list && *unpublished_gen_col) + break; } - return result; + bms_free(columns); + bms_free(idattrs); + + return *invalid_column_list || *unpublished_gen_col; } /* check_functions_in_node callback */ diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index be8f8eea8f..c1199596bf 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5783,6 +5783,8 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) Oid pubid = lfirst_oid(lc); HeapTuple tup; Form_pg_publication pubform; + bool invalid_column_list; + bool unpublished_gen_col; tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); @@ -5817,33 +5819,27 @@ RelationBuildPublicationDesc(Relation relation, PublicationDesc *pubdesc) /* * Check if all columns are part of the REPLICA IDENTITY index or not. * - * If the publication is FOR ALL TABLES then it means the table has no - * column list and we can skip the validation. - */ - if (!pubform->puballtables && - (pubform->pubupdate || pubform->pubdelete) && - pub_collist_contains_invalid_column(pubid, relation, ancestors, - pubform->pubviaroot)) - { - if (pubform->pubupdate) - pubdesc->cols_valid_for_update = false; - if (pubform->pubdelete) - pubdesc->cols_valid_for_delete = false; - } - - /* * Check if all generated columns included in the REPLICA IDENTITY are * published. */ - if (!pubform->pubgencols && - (pubform->pubupdate || pubform->pubdelete) && - replident_has_unpublished_gen_col(pubid, relation, ancestors, - pubform->pubviaroot)) + if ((pubform->pubupdate || pubform->pubdelete) && + pub_contains_invalid_column(pubid, relation, ancestors, + pubform->pubviaroot, + pubform->pubgencols, + &invalid_column_list, + &unpublished_gen_col)) { if (pubform->pubupdate) - pubdesc->replident_valid_for_update = false; + { + pubdesc->cols_valid_for_update = !invalid_column_list; + pubdesc->replident_valid_for_update = !unpublished_gen_col; + } + if (pubform->pubdelete) - pubdesc->replident_valid_for_delete = false; + { + pubdesc->cols_valid_for_delete = !invalid_column_list; + pubdesc->replident_valid_for_delete = !unpublished_gen_col; + } } ReleaseSysCache(tup); diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index b18e576b77..fbf78ea08e 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -33,8 +33,11 @@ extern void AlterPublicationOwner_oid(Oid subid, Oid newOwnerId); extern void InvalidatePublicationRels(List *relids); extern bool pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, bool pubviaroot); -extern bool pub_collist_contains_invalid_column(Oid pubid, Relation relation, - List *ancestors, bool pubviaroot); +extern bool pub_contains_invalid_column(Oid pubid, Relation relation, + List *ancestors, bool pubviaroot, + bool pubgencols, + bool *invalid_column_list, + bool *unpublished_gen_col); extern bool replident_has_unpublished_gen_col(Oid pubid, Relation relation, List *ancestors, bool pubviaroot); -- 2.30.0.windows.2