use strict; use warnings FATAL => 'all'; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; # no. of concurrent txn my $concurrent_txns = 100; # max_connections to be set my $max_connections = $concurrent_txns + 100; # Setup node my $node = PostgreSQL::Test::Cluster->new('publisher'); $node->init(allows_streaming => 'logical'); $node->append_conf( 'postgresql.conf', q[ shared_buffers = 10GB max_worker_processes = 32 max_parallel_maintenance_workers = 24 max_parallel_workers = 32 synchronous_commit = on checkpoint_timeout = 1d max_wal_size = 24GB min_wal_size = 15GB autovacuum = off logical_decoding_work_mem = 10GB ]); $node->append_conf('postgresql.conf', "max_connections = $max_connections"); $node->start; # table t1 is not part of any publication $node->safe_psql( 'postgres', qq( CREATE TABLE t1(a int); CREATE TABLE tab_conc1(a int); CREATE PUBLICATION regress_pub1 for table tab_conc1,t1; SELECT * FROM pg_create_logical_replication_slot('regression_slot1', 'pgoutput'); )); my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default; my @background_psqls; # create $concurrent_txns sessions foreach my $i (1 .. $concurrent_txns) { my $background_psql = $node->background_psql( 'postgres', on_error_stop => 0, timeout => $psql_timeout_secs); push (@background_psqls, $background_psql); } # begin txn in each session foreach my $background_psql (@background_psqls) { $background_psql->query_safe('BEGIN;'); $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (11);]); } # unrelated invalidate message is distributed to all $concurrent_txns foreach my $i (1 .. 1) { $node->safe_psql('postgres', 'ALTER PUBLICATION regress_pub1 DROP TABLE t1;'); $node->safe_psql('postgres', 'ALTER PUBLICATION regress_pub1 ADD TABLE t1;'); } foreach my $background_psql (@background_psqls) { $background_psql->query_safe(qq[INSERT INTO tab_conc1 VALUES (12);]); $background_psql->query_safe('COMMIT;'); $background_psql->quit; } # Measure the decoding time via SQL interface - pg_logical_slot_get_binary_changes my $start = Time::HiRes::gettimeofday(); my $result = $node->safe_psql('postgres', "SELECT count(*) FROM pg_logical_slot_get_binary_changes('regression_slot1', NULL, NULL, 'proto_version', '4', 'publication_names', 'regress_pub1');"); my $end = Time::HiRes::gettimeofday(); printf("time elapsed %f\n", $end - $start); is($result, 4*$concurrent_txns+1, 'changes were decoded properly'); $node->stop('fast'); done_testing();