From 8f4e36c5fc65d4a88058467a73cbe423a5f0e91e Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 9 Sep 2024 19:56:18 +0800 Subject: [PATCH] test invalidation distribution --- contrib/test_decoding/Makefile | 2 +- .../expected/invalidation_distrubution.out | 173 ++++++++++++++++++ .../specs/invalidation_distrubution.spec | 56 ++++++ 3 files changed, 230 insertions(+), 1 deletion(-) create mode 100644 contrib/test_decoding/expected/invalidation_distrubution.out create mode 100644 contrib/test_decoding/specs/invalidation_distrubution.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index a4ba1a509a..eef7077067 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot \ - skip_snapshot_restore + skip_snapshot_restore invalidation_distrubution REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/invalidation_distrubution.out b/contrib/test_decoding/expected/invalidation_distrubution.out new file mode 100644 index 0000000000..cdc871d31d --- /dev/null +++ b/contrib/test_decoding/expected/invalidation_distrubution.out @@ -0,0 +1,173 @@ +Parsed test spec with 2 sessions + +starting permutation: s1_init s2_create_pub s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes s2_drop_pub +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s2_create_pub: CREATE PUBLICATION pub; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + + +starting permutation: s1_init s2_create_pub s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_alter_pub_add_schema s1_commit s1_insert_tbl1 s2_get_binary_changes s2_drop_pub +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s2_create_pub: CREATE PUBLICATION pub; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_pub_add_schema: ALTER PUBLICATION pub ADD TABLES IN SCHEMA public; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + + +starting permutation: s1_init s2_create_pub s2_alter_pub_add_tbl s1_begin s1_insert_tbl1 s2_alter_pub_drop_tbl s1_commit s1_insert_tbl1 s2_get_binary_changes s2_drop_pub +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s2_create_pub: CREATE PUBLICATION pub; +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_pub_drop_tbl: ALTER PUBLICATION pub DROP TABLE tbl1; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + + +starting permutation: s1_init s2_create_pub s2_alter_pub_add_schema s1_begin s1_insert_tbl1 s2_alter_pub_drop_schema s1_commit s1_insert_tbl1 s2_get_binary_changes s2_drop_pub +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s2_create_pub: CREATE PUBLICATION pub; +step s2_alter_pub_add_schema: ALTER PUBLICATION pub ADD TABLES IN SCHEMA public; +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_alter_pub_drop_schema: ALTER PUBLICATION pub DROP TABLES IN SCHEMA public; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + + +starting permutation: s1_init s2_create_pub s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_begin s2_savepoint s2_alter_pub_add_tbl s2_commit s1_commit s1_insert_tbl1 s2_get_binary_changes s2_drop_pub +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s2_create_pub: CREATE PUBLICATION pub; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_begin: BEGIN; +step s2_savepoint: SAVEPOINT s1; +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s2_commit: COMMIT; +step s1_commit: COMMIT; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_get_binary_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + + +starting permutation: s2_create_pub s1_init s1_insert_tbl1 s1_begin s1_insert_tbl1 s2_set_streaming_mode s2_alter_pub_add_tbl s2_get_binary_stream_changes s1_insert_tbl1 s1_commit s2_get_binary_stream_changes s2_drop_pub +step s2_create_pub: CREATE PUBLICATION pub; +step s1_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); +?column? +-------- +init +(1 row) + +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_begin: BEGIN; +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s2_set_streaming_mode: SET debug_logical_replication_streaming = immediate; +step s2_alter_pub_add_tbl: ALTER PUBLICATION pub ADD TABLE tbl1; +step s2_get_binary_stream_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'streaming', 'on') WHERE get_byte(data, 0) = 73; +count +----- + 0 +(1 row) + +step s1_insert_tbl1: INSERT INTO tbl1 (val1, val2) VALUES (1, 1); +step s1_commit: COMMIT; +step s2_get_binary_stream_changes: SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'streaming', 'on') WHERE get_byte(data, 0) = 73; +count +----- + 1 +(1 row) + +step s2_drop_pub: DROP PUBLICATION pub; +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/invalidation_distrubution.spec b/contrib/test_decoding/specs/invalidation_distrubution.spec new file mode 100644 index 0000000000..0d3d328250 --- /dev/null +++ b/contrib/test_decoding/specs/invalidation_distrubution.spec @@ -0,0 +1,56 @@ +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1(val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s1" +setup { SET synchronous_commit=on; } + +step "s1_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'pgoutput'); } +step "s1_begin" { BEGIN; } +step "s1_insert_tbl1" { INSERT INTO tbl1 (val1, val2) VALUES (1, 1); } +step "s1_commit" { COMMIT; } + +session "s2" +setup { SET synchronous_commit=on; } + +step "s2_begin" { BEGIN; } +step "s2_savepoint" { SAVEPOINT s1; } +step "s2_set_streaming_mode" { SET debug_logical_replication_streaming = immediate; } +step "s2_create_pub" { CREATE PUBLICATION pub; } +step "s2_alter_pub_add_tbl" { ALTER PUBLICATION pub ADD TABLE tbl1; } +step "s2_alter_pub_drop_tbl" { ALTER PUBLICATION pub DROP TABLE tbl1; } +step "s2_alter_pub_add_schema" { ALTER PUBLICATION pub ADD TABLES IN SCHEMA public; } +step "s2_alter_pub_drop_schema" { ALTER PUBLICATION pub DROP TABLES IN SCHEMA public; } +step "s2_drop_pub" { DROP PUBLICATION pub; } + + +step "s2_get_binary_stream_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub', 'streaming', 'on') WHERE get_byte(data, 0) = 73; } +step "s2_commit" { COMMIT; } + +step "s2_get_binary_changes" { SELECT count(data) FROM pg_logical_slot_get_binary_changes('isolation_slot', NULL, NULL, 'proto_version', '4', 'publication_names', 'pub') WHERE get_byte(data, 0) = 73; } + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_init" "s2_create_pub" "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub" + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_init" "s2_create_pub" "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_add_schema" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub" + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_init" "s2_create_pub" "s2_alter_pub_add_tbl" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_drop_tbl" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub" + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_init" "s2_create_pub" "s2_alter_pub_add_schema" "s1_begin" "s1_insert_tbl1" "s2_alter_pub_drop_schema" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub" + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s1_init" "s2_create_pub" "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_begin" "s2_savepoint" "s2_alter_pub_add_tbl" "s2_commit" "s1_commit" "s1_insert_tbl1" "s2_get_binary_changes" "s2_drop_pub" + +# Expect to get one insert change. LOGICAL_REP_MSG_INSERT = 'I' +permutation "s2_create_pub" "s1_init" "s1_insert_tbl1" "s1_begin" "s1_insert_tbl1" "s2_set_streaming_mode" "s2_alter_pub_add_tbl" "s2_get_binary_stream_changes" "s1_insert_tbl1" "s1_commit" "s2_get_binary_stream_changes" "s2_drop_pub" -- 2.30.0.windows.2