Re: row filtering for logical replication

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: Euler Taveira <euler(at)eulerto(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Rahila Syed <rahilasyed90(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Önder Kalacı <onderkalaci(at)gmail(dot)com>, japin <japinli(at)hotmail(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, David Steele <david(at)pgmasters(dot)net>, Craig Ringer <craig(at)2ndquadrant(dot)com>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Amit Langote <amitlangote09(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: row filtering for logical replication
Date: 2021-07-02 07:29:59
Message-ID: CAHut+Ps3GgPKUJ2npfY4bQdxAmYW+yQin+hQuBsMYvX=kBqEpA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi.

I have been looking at the latest patch set (v16). Below are my review
comments and some patches.

The patches are:
v16-0001. This is identical to your previously posted 0001 patch. (I
only attached it again hoping it can allow the cfbot to keep working
OK).
v16-0002,0003. These are for demonstrating some of the review comments
v16-0004. This is a POC plan cache for your consideration.

//////////

REVIEW COMMENTS
===============

1. Patch 0001 comment - typo

you can optionally filter rows that does not satisfy a WHERE condition

typo: does/does

~~

2. Patch 0001 comment - typo

The WHERE clause should probably contain only columns that are part of
the primary key or that are covered by REPLICA IDENTITY. Otherwise,
and DELETEs won't be replicated.

typo: "Otherwise, and DELETEs" ??

~~

3. Patch 0001 comment - typo and clarification

If your publication contains partitioned table, the parameter
publish_via_partition_root determines if it uses the partition row filter (if
the parameter is false -- default) or the partitioned table row filter.

Typo: "contains partitioned table" -> "contains a partitioned table"

Also, perhaps the text "or the partitioned table row filter." should
say "or the root partitioned table row filter." to disambiguate the
case where there are more levels of partitions like A->B->C. e.g. What
filter does C use?

~~

4. src/backend/catalog/pg_publication.c - misleading names

-publication_add_relation(Oid pubid, Relation targetrel,
+publication_add_relation(Oid pubid, PublicationRelationInfo *targetrel,
bool if_not_exists)

Leaving this parameter name as "targetrel" seems a bit misleading now
in the function code. Maybe this should be called something like "pri"
which is consistent with other places where you have declared
PublicationRelationInfo.

Also, consider declaring some local variables so that the patch may
have less impact on existing code. e.g.
Oid relid = pri->relid
Relation *targetrel = relationinfo->relation

~~

5. src/backend/commands/publicationcmds.c - simplify code

- rels = OpenTableList(stmt->tables);
+ if (stmt->tableAction == DEFELEM_DROP)
+ rels = OpenTableList(stmt->tables, true);
+ else
+ rels = OpenTableList(stmt->tables, false);

Consider writing that code more simply as just:

rels = OpenTableList(stmt->tables, stmt->tableAction == DEFELEM_DROP);

~~

6. src/backend/commands/publicationcmds.c - bug?

- CloseTableList(rels);
+ CloseTableList(rels, false);
}

Is this a potential bug? When you called OpenTableList the 2nd param
was maybe true/false, so is it correct to be unconditionally false
here? I am not sure.

~~

7. src/backend/commands/publicationcmds.c - OpenTableList function comment.

* Open relations specified by a RangeVar list.
+ * AlterPublicationStmt->tables has a different list element, hence, is_drop
+ * indicates if it has a RangeVar (true) or PublicationTable (false).
* The returned tables are locked in ShareUpdateExclusiveLock mode in order to
* add them to a publication.

I am not sure about this. Should that comment instead say "indicates
if it has a Relation (true) or PublicationTable (false)"?

~~

8. src/backend/commands/publicationcmds.c - OpenTableList

- RangeVar *rv = castNode(RangeVar, lfirst(lc));
- bool recurse = rv->inh;
+ PublicationTable *t = NULL;
+ RangeVar *rv;
+ bool recurse;
Relation rel;
Oid myrelid;

+ if (is_drop)
+ {
+ rv = castNode(RangeVar, lfirst(lc));
+ }
+ else
+ {
+ t = lfirst(lc);
+ rv = castNode(RangeVar, t->relation);
+ }
+
+ recurse = rv->inh;
+

For some reason it feels kind of clunky to me for this function to be
processing the list differently according to the 2nd param. e.g. the
name "is_drop" seems quite unrelated to the function code, and more to
do with where it was called from. Sorry, I don't have any better ideas
for improvement atm.

~~

9. src/backend/commands/publicationcmds.c - OpenTableList bug?

- rels = lappend(rels, rel);
+ pri = palloc(sizeof(PublicationRelationInfo));
+ pri->relid = myrelid;
+ pri->relation = rel;
+ if (!is_drop)
+ pri->whereClause = t->whereClause;
+ rels = lappend(rels, pri);

I felt maybe this is a possible bug here because there seems no code
explicitly assigning the whereClause = NULL if "is_drop" is true so
maybe it can have a garbage value which could cause problems later.
Maybe this is fixed by using palloc0.

Same thing is 2x in this function.

~~

10. src/backend/commands/publicationcmds.c - CloseTableList function comment

@@ -587,16 +609,28 @@ OpenTableList(List *tables)
* Close all relations in the list.
*/
static void
-CloseTableList(List *rels)
+CloseTableList(List *rels, bool is_drop)
{

Probably the meaning of "is_drop" should be described in this function comment.

~~

11. src/backend/replication/pgoutput/pgoutput.c - get_rel_sync_entry signature.

-static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
+static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation rel);

I see that this function signature is modified but I did not see how
this parameter refactoring is actually related to the RowFilter patch.
Perhaps I am mistaken, but IIUC this only changes the relid =
RelationGetRelid(rel); to be done inside this function instead of
being done outside by the callers.

It impacts other code like in pgoutput_truncate:

@@ -689,12 +865,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
for (i = 0; i < nrelations; i++)
{
Relation relation = relations[i];
- Oid relid = RelationGetRelid(relation);

if (!is_publishable_relation(relation))
continue;

- relentry = get_rel_sync_entry(data, relid);
+ relentry = get_rel_sync_entry(data, relation);

if (!relentry->pubactions.pubtruncate)
continue;
@@ -704,10 +879,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
* root tables through it.
*/
if (relation->rd_rel->relispartition &&
- relentry->publish_as_relid != relid)
+ relentry->publish_as_relid != relentry->relid)
continue;

- relids[nrelids++] = relid;
+ relids[nrelids++] = relentry->relid;
maybe_send_schema(ctx, txn, change, relation, relentry);
}
So maybe this is a good refactor or maybe not, but I felt this should
not be included as part of the RowFilter patch unless it is really
necessary.

~~

12. src/backend/replication/pgoutput/pgoutput.c - missing function comments

The static functions create_estate_for_relation and
pgoutput_row_filter_prepare_expr probably should be commented.

~~

13. src/backend/replication/pgoutput/pgoutput.c -
pgoutput_row_filter_prepare_expr function name

+static ExprState *pgoutput_row_filter_prepare_expr(Node *rfnode,
EState *estate);

This function has an unfortunate name with the word "prepare" in it. I
wonder if a different name can be found for this function to avoid any
confusion with pgoutput functions (coming soon) which are related to
the two-phase commit "prepare".

~~

14. src/bin/psql/describe.c

+ if (!PQgetisnull(tabres, j, 2))
+ appendPQExpBuffer(&buf, " WHERE (%s)",
+ PQgetvalue(tabres, j, 2));

Because the where-clause value already has enclosing parentheses so
using " WHERE (%s)" seems overkill here. e.g. you can see the effect
in your src/test/regress/expected/publication.out file. I think this
should be changed to " WHERE %s" to give better output.

~~

15. src/include/catalog/pg_publication.h - new typedef

+typedef struct PublicationRelationInfo
+{
+ Oid relid;
+ Relation relation;
+ Node *whereClause;
+} PublicationRelationInfo;
+

The new PublicationRelationInfo should also be added
src/tools/pgindent/typedefs.list

~~

16. src/include/nodes/parsenodes.h - new typedef

+typedef struct PublicationTable
+{
+ NodeTag type;
+ RangeVar *relation; /* relation to be published */
+ Node *whereClause; /* qualifications */
+} PublicationTable;

The new PublicationTable should also be added src/tools/pgindent/typedefs.list

~~

17. sql/publication.sql - show more output

+CREATE PUBLICATION testpub5 FOR TABLE testpub_rf_tbl1,
testpub_rf_tbl2 WHERE (c <> 'test' AND d < 5);
+RESET client_min_messages;
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl3 WHERE (e > 1000
AND e < 2000);
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl2;
+-- remove testpub_rf_tbl1 and add testpub_rf_tbl3 again (another
WHERE expression)
+ALTER PUBLICATION testpub5 SET TABLE testpub_rf_tbl3 WHERE (e > 300
AND e < 500);
+-- fail - functions disallowed
+ALTER PUBLICATION testpub5 ADD TABLE testpub_rf_tbl4 WHERE (length(g) < 6);
+-- fail - WHERE not allowed in DROP
+ALTER PUBLICATION testpub5 DROP TABLE testpub_rf_tbl3 WHERE (e < 27);
+\dRp+ testpub5

I felt that it would be better to have a "\dRp+ testpub5" after each
of the valid ALTER PUBLICATION steps to show the intermediate results
also; not just the final one at the end.

(PSA a temp patch showing what I mean by this review comment)

~~

18. src/test/subscription/t/020_row_filter.pl - rename file

I think this file should be renamed to 021_row_filter.pl as there is
already an 020 TAP test present.

~~

19. src/test/subscription/t/020_row_filter.pl - test comments

AFAIK the test cases are all OK, but it was really quite hard to
review these TAP tests to try to determine what the expected results
should be.

I found that I had to add my own comments to the file so I could
understand what was going on, so I think the TAP test can benefit lots
from having many more comments describing how the expected results are
determined.

Also, the filtering does not take place at the INSERT but really it is
affected only by which publications the subscription has subscribed
to. So I thought some of the existing comments (although correct) are
misplaced.

(PSA a temp patch showing what I mean by this review comment)

~~~

20. src/test/subscription/t/020_row_filter.pl - missing test case?

There are some partition tests, but I did not see any test that was
like 3 levels deep like A->B->C, so I was not sure if there is any
case C would ever make use of the filter of its parent B, or would it
only use the filter of the root A?

~~

21. src/test/subscription/t/020_row_filter.pl - missing test case?

If the same table is in multiple publications they can each have a row
filter. And a subscription might subscribe to some but not all of
those publications. I think this scenario is only partly tested.

e.g.
pub_1 has tableX with RowFilter1
pub_2 has tableX with RowFilter2

Then sub_12 subscribes to pub_1, pub_2
This is already tested in your TAP test (I think) and it makes sure
both filters are applied

But if there was also
pub_3 has tableX with RowFilter3

Then sub_12 still should only be checking the filtered RowFilter1 AND
RowFilter2 (but NOT row RowFilter3). I think this scenario is not
tested.

////////////////

POC PATCH FOR PLAN CACHE
========================

PSA a POC patch for a plan cache which gets used inside the
pgoutput_row_filter function instead of calling prepare for every row.
I think this is implementing something like Andes was suggesting a
while back [1].

Measurements with/without this plan cache:

Time spent processing within the pgoutput_row_filter function
- Data was captured using the same technique as the
0002-Measure-row-filter-overhead.patch.
- Inserted 1000 rows, sampled data for the first 100 times in this function.
not cached: average ~ 28.48 us
cached: average ~ 9.75 us

Replication times:
- Using tables and row filters same as in Onder's commands_to_test_perf.sql [2]
100K rows - not cached: ~ 42sec, 43sec, 44sec
100K rows - cached: ~ 41sec, 42sec, 42 sec.

There does seem to be a tiny gain achieved by having the plan cache,
but I think the gain might be a lot less than what people were
expecting.

Unless there are millions of rows the speedup may be barely noticeable.

--------
[1] https://www.postgresql.org/message-id/20210128022032.eq2qqc6zxkqn5syt%40alap3.anarazel.de
[2] https://www.postgresql.org/message-id/CACawEhW_iMnY9XK2tEb1ig%2BA%2BgKeB4cxdJcxMsoCU0SaKPExxg%40mail.gmail.com

Kind Regards,
Peter Smith.
Fujitsu Australia

Attachment Content-Type Size
v16-0001-Row-filter-for-logical-replication.patch application/octet-stream 66.5 KB
v16-0002-PS-tmp-describe-intermediate-test-steps.patch application/octet-stream 5.1 KB
v16-0004-PS-POC-Implement-a-plan-cache-for-pgoutput.patch application/octet-stream 7.8 KB
v16-0003-PS-tmp-add-more-comments-for-expected-results.patch application/octet-stream 8.8 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message gkokolatos 2021-07-02 07:35:00 Re: Teach pg_receivewal to use lz4 compression
Previous Message Amit Kapila 2021-07-02 07:06:32 Re: Refactor "mutually exclusive options" error reporting code in parse_subscription_options