From ef7699a6f5ebe5d76f2f7ce14975b515a97df925 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Wed, 6 Apr 2022 14:47:36 +0200 Subject: [PATCH v1] Automatically replicate identity sequences with their tables This automatically includes an identity sequence into a publication if the owning table is part of the publication (directly or indirectly). --- src/backend/catalog/pg_publication.c | 10 ++++++++++ src/backend/replication/pgoutput/pgoutput.c | 18 ++++++++++++++++++ src/test/subscription/t/030_sequences.pl | 18 +++++++++++++++++- 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 9fe3b18926..0f9bf1a28f 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1392,6 +1392,7 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS) { List *relids, *schemarelids; + ListCell *lc; relids = GetPublicationRelations(publication->oid, PUB_OBJTYPE_SEQUENCE, @@ -1404,6 +1405,15 @@ pg_get_publication_sequences(PG_FUNCTION_ARGS) PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); sequences = list_concat_unique_oid(relids, schemarelids); + + foreach(lc, GetPublicationRelations(publication->oid, + PUB_OBJTYPE_TABLE, + publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF)) + { + Oid relid = lfirst_oid(lc); + + sequences = list_concat_unique_oid(sequences, getOwnedSequences(relid)); + } } funcctx->user_fctx = (void *) sequences; diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9d33630464..1044458a47 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -13,6 +13,7 @@ #include "postgres.h" #include "access/tupconvert.h" +#include "catalog/dependency.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_namespace.h" @@ -2206,6 +2207,23 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) } } + if (relation->rd_rel->relkind == RELKIND_SEQUENCE) + { + Oid tableId; + int32 colId; + + if (sequenceIsOwned(relid, DEPENDENCY_INTERNAL, &tableId, &colId)) + { + if (GetRelationPublications(tableId) || + GetSchemaPublications(get_rel_namespace(tableId), + PUB_OBJTYPE_TABLE) || + pub->alltables) + { + ancestor_published = true; + } + } + } + if (list_member_oid(pubids, pub->oid) || list_member_oid(schemaPubids, pub->oid) || ancestor_published) diff --git a/src/test/subscription/t/030_sequences.pl b/src/test/subscription/t/030_sequences.pl index 9ae3c03d7d..7b5b735bff 100644 --- a/src/test/subscription/t/030_sequences.pl +++ b/src/test/subscription/t/030_sequences.pl @@ -22,6 +22,7 @@ my $ddl = qq( CREATE TABLE seq_test (v BIGINT); CREATE SEQUENCE s; + CREATE TABLE identity_test (v BIGINT GENERATED BY DEFAULT AS IDENTITY); ); # Setup structure on the publisher @@ -33,6 +34,7 @@ CREATE TABLE seq_test (v BIGINT); CREATE SEQUENCE s; CREATE SEQUENCE s2; + CREATE TABLE identity_test (v BIGINT GENERATED BY DEFAULT AS IDENTITY); ); $node_subscriber->safe_psql('postgres', $ddl); @@ -45,8 +47,14 @@ $node_publisher->safe_psql('postgres', "ALTER PUBLICATION seq_pub ADD SEQUENCE s"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tab_pub"); + +$node_publisher->safe_psql('postgres', + "ALTER PUBLICATION tab_pub ADD TABLE identity_test"); + $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub" + "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub, tab_pub" ); $node_publisher->wait_for_catchup('seq_sub'); @@ -62,6 +70,7 @@ 'postgres', qq( -- generate a number of values using the sequence INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100); + INSERT INTO identity_test SELECT * FROM generate_series(1,100); )); $node_publisher->wait_for_catchup('seq_sub'); @@ -75,6 +84,13 @@ is( $result, '132|0|t', 'initial test data replicated'); +is($node_subscriber->safe_psql('postgres', qq(SELECT max(v) FROM identity_test)), + '100', + 'identity table replicated'); + +is($node_subscriber->safe_psql('postgres', qq(SELECT * FROM identity_test_v_seq)), + '100|0|t', + 'identity sequence advanced'); # advance the sequence in a rolled-back transaction - the rollback # does not wait for the replication, so we could see any intermediate state -- 2.35.1