From 1fc88208de13fae8d287d456b899d4ebe01558be Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Fri, 31 Jan 2025 21:45:31 +0900 Subject: [PATCH v20250131] Try to add tests --- contrib/test_decoding/Makefile | 2 +- contrib/test_decoding/expected/filter.out | 26 +++++++++++++++++++++++ contrib/test_decoding/meson.build | 1 + contrib/test_decoding/sql/filter.sql | 16 ++++++++++++++ contrib/test_decoding/test_decoding.c | 21 ++++++++++++++++++ 5 files changed, 65 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/filter.out create mode 100644 contrib/test_decoding/sql/filter.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index a4ba1a509a..a66ab2cbf8 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream stats twophase twophase_stream + spill slot truncate stream stats twophase twophase_stream filter ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot \ diff --git a/contrib/test_decoding/expected/filter.out b/contrib/test_decoding/expected/filter.out new file mode 100644 index 0000000000..a3b43f2772 --- /dev/null +++ b/contrib/test_decoding/expected/filter.out @@ -0,0 +1,26 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- Create two tables +CREATE TABLE test(id int); +CREATE TABLE test_skipped(id int); +-- Changes for XXX_skipped are skipped +BEGIN; +INSERT INTO test_skipped VALUES (1); +COMMIT; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build index 54d65d3f30..9f1f98e6e2 100644 --- a/contrib/test_decoding/meson.build +++ b/contrib/test_decoding/meson.build @@ -41,6 +41,7 @@ tests += { 'stats', 'twophase', 'twophase_stream', + 'filter', ], 'regress_args': [ '--temp-config', files('logical.conf'), diff --git a/contrib/test_decoding/sql/filter.sql b/contrib/test_decoding/sql/filter.sql new file mode 100644 index 0000000000..08ec80eb03 --- /dev/null +++ b/contrib/test_decoding/sql/filter.sql @@ -0,0 +1,16 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- Create two tables +CREATE TABLE test(id int); +CREATE TABLE test_skipped(id int); + +-- Changes for XXX_skipped are skipped +BEGIN; +INSERT INTO test_skipped VALUES (1); +COMMIT; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 0113b19636..789a4bc2aa 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -68,6 +68,9 @@ static void pg_decode_truncate(LogicalDecodingContext *ctx, ReorderBufferChange *change); static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id); +static bool pg_decode_filter_change(LogicalDecodingContext *ctx, Oid relid, + ReorderBufferChangeType change_type, + bool in_txn, bool *cache_valid); static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional, const char *prefix, @@ -133,6 +136,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->truncate_cb = pg_decode_truncate; cb->commit_cb = pg_decode_commit_txn; cb->filter_by_origin_cb = pg_decode_filter; + cb->filter_change_cb = pg_decode_filter_change; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; cb->filter_prepare_cb = pg_decode_filter_prepare; @@ -467,6 +471,23 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +static bool +pg_decode_filter_change(LogicalDecodingContext *ctx, Oid relid, + ReorderBufferChangeType change_type, bool in_txn, + bool *cache_valid) +{ + if (in_txn) + { + Relation relation = RelationIdGetRelation(relid); + char *relname = NameStr(relation->rd_rel->relname); + + if (strstr(relname, "_skipped") != NULL) + return true; + } + + return false; +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. -- 2.43.5