diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f7e51da..b9eaf2b 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -771,37 +771,37 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, */ table_state = opts.copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; - /* Add the sequences in init state */ - sequences = fetch_sequence_list(wrconn, publications); - foreach_ptr(RangeVar, rv, sequences) + /* + * Get the table list from publisher and build local table status + * info. + */ + tables = fetch_table_list(wrconn, publications); + foreach(lc, tables) { + RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; relid = RangeVarGetRelid(rv, AccessShareLock, false); /* Check for supported relkind. */ CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + rv->schemaname, rv->relname); AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr, true); } - /* - * Get the table list from publisher and build local table status - * info. - */ - tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) + /* Add the sequences in init state */ + sequences = fetch_sequence_list(wrconn, publications); + foreach_ptr(RangeVar, rv, sequences) { - RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; relid = RangeVarGetRelid(rv, AccessShareLock, false); /* Check for supported relkind. */ CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + rv->schemaname, rv->relname); AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr, true); @@ -1028,7 +1028,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, RemoveSubscriptionRel(sub->oid, relid); - /* Stop the worker if relation kind is not sequence*/ + /* Stop the worker if relation kind is not sequence. */ if (relkind != RELKIND_SEQUENCE) logicalrep_worker_stop(sub->oid, relid); @@ -1106,7 +1106,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * Refresh the sequences data of the subscription. */ static void -AlterSubscription_refreshsequences(Subscription *sub) +AlterSubscription_refresh_sequences(Subscription *sub) { char *err; List *pubseq_names = NIL; @@ -1574,7 +1574,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES"); - AlterSubscription_refreshsequences(sub); + AlterSubscription_refresh_sequences(sub); break; } @@ -2235,13 +2235,11 @@ check_publications_origin(WalReceiverConn *wrconn, List *publications, for (i = 0; i < subrel_count; i++) { Oid relid = subrel_local_oids[i]; - char *schemaname; - char *tablename; if (get_rel_relkind(relid) != RELKIND_SEQUENCE) { - schemaname = get_namespace_name(get_rel_namespace(relid)); - tablename = get_rel_name(relid); + char *schemaname = get_namespace_name(get_rel_namespace(relid)); + char *tablename = get_rel_name(relid); appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n", schemaname, tablename); @@ -2427,14 +2425,14 @@ fetch_sequence_list(WalReceiverConn *wrconn, List *publications) TupleTableSlot *slot; Oid tableRow[2] = {TEXTOID, TEXTOID}; bool first; - List *tablelist = NIL; + List *seqlist = NIL; Assert(list_length(publications) > 0); initStringInfo(&cmd); appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.sequencename\n" - " FROM pg_catalog.pg_publication_sequences s\n" - " WHERE s.pubname IN ("); + " FROM pg_catalog.pg_publication_sequences s\n" + " WHERE s.pubname IN ("); first = true; foreach_ptr(String, pubname, publications) { @@ -2470,7 +2468,7 @@ fetch_sequence_list(WalReceiverConn *wrconn, List *publications) Assert(!isnull); rv = makeRangeVar(nspname, relname, -1); - tablelist = lappend(tablelist, rv); + seqlist = lappend(seqlist, rv); ExecClearTuple(slot); } @@ -2478,7 +2476,7 @@ fetch_sequence_list(WalReceiverConn *wrconn, List *publications) walrcv_clear_result(res); - return tablelist; + return seqlist; } /* diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 6770e26..f8dd93a 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -131,10 +131,10 @@ static const struct "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, { - "TablesyncWorkerMain", TablesyncWorkerMain + "TableSyncWorkerMain", TableSyncWorkerMain }, { - "SequencesyncWorkerMain", SequencesyncWorkerMain + "SequenceSyncWorkerMain", SequenceSyncWorkerMain } }; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 2451eca..4ab470f 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -276,18 +276,17 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) LogicalRepWorker * logicalrep_sequence_sync_worker_find(Oid subid, bool only_running) { - int i; LogicalRepWorker *res = NULL; Assert(LWLockHeldByMe(LogicalRepWorkerLock)); /* Search for attached worker for a given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) + for (int i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; /* Skip non sequence sync workers. */ - if (!isSequencesyncWorker(w)) + if (!isSequenceSyncWorker(w)) continue; if (w->in_use && w->subid == subid && (only_running && w->proc)) @@ -331,15 +330,13 @@ logicalrep_workers_find(Oid subid, bool only_running) static LogicalRepWorker * logicalrep_apply_worker_find(Oid subid, bool only_running) { - int i; - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - for (i = 0; i < max_logical_replication_workers; i++) + for (int i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isApplyWorker(w) && w->subid == subid && (only_running && w->proc)) + if (isApplyWorker(w) && w->subid == subid && only_running && w->proc) { LWLockRelease(LogicalRepWorkerLock); return w; @@ -545,7 +542,7 @@ retry: break; case WORKERTYPE_TABLESYNC: - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TableSyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication tablesync worker for subscription %u sync %u", subid, @@ -554,7 +551,7 @@ retry: break; case WORKERTYPE_SEQUENCESYNC: - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequencesyncWorkerMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication sequencesync worker for subscription %u", subid); @@ -941,7 +938,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isTablesyncWorker(w) && w->subid == subid) + if (isTableSyncWorker(w) && w->subid == subid) res++; } @@ -1392,7 +1389,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (isTablesyncWorker(&worker)) + if (isTableSyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index 92980e8..0ba8c1a 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -109,8 +109,8 @@ copy_sequence(WalReceiverConn *conn, Relation rel) if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("could not fetch sequence info for table \"%s.%s\" from publisher: %s", - nspname, RelationGetRelationName(rel), res->err))); + errmsg("sequence \"%s.%s\" info could not be fetched from publisher: %s", + nspname, relname, res->err))); slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) @@ -123,12 +123,11 @@ copy_sequence(WalReceiverConn *conn, Relation rel) Assert(!isnull); relkind = DatumGetChar(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + Assert(relkind == RELKIND_SEQUENCE); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); - Assert(relkind == RELKIND_SEQUENCE); - /* * Logical replication of sequences is based on decoding WAL records, * describing the "next" state of the sequence the current state in the @@ -152,12 +151,12 @@ copy_sequence(WalReceiverConn *conn, Relation rel) * Start syncing the sequences in the sync worker. */ static void -LogicalRepSyncSequences() +LogicalRepSyncSequences(void) { char *err; bool must_use_password; - List *sequences; - char slotname[NAMEDATALEN]; + List *sequences; + char slotname[NAMEDATALEN]; AclResult aclresult; UserContext ucxt; bool run_as_owner = false; @@ -169,8 +168,7 @@ LogicalRepSyncSequences() /* Get the sequences that should be synchronized. */ StartTransactionCommand(); - sequences = GetSubscriptionSequences(subid, - SUBREL_STATE_INIT); + sequences = GetSubscriptionSequences(subid, SUBREL_STATE_INIT); CommitTransactionCommand(); /* Is the use of a password mandatory? */ @@ -197,7 +195,7 @@ LogicalRepSyncSequences() seq_count = list_length(sequences); foreach_ptr(SubscriptionRelState, seqinfo, sequences) { - Relation sequencerel; + Relation sequence_rel; XLogRecPtr sequence_lsn; int next_seq; @@ -206,7 +204,7 @@ LogicalRepSyncSequences() if (curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH == 0) StartTransactionCommand(); - sequencerel = table_open(seqinfo->relid, RowExclusiveLock); + sequence_rel = table_open(seqinfo->relid, RowExclusiveLock); /* * Make sure that the copy command runs as the sequence owner, unless the @@ -214,18 +212,18 @@ LogicalRepSyncSequences() */ run_as_owner = MySubscription->runasowner; if (!run_as_owner) - SwitchToUntrustedUser(sequencerel->rd_rel->relowner, &ucxt); + SwitchToUntrustedUser(sequence_rel->rd_rel->relowner, &ucxt); /* * Check that our sequence sync worker has permission to insert into the * target sequence. */ - aclresult = pg_class_aclcheck(RelationGetRelid(sequencerel), GetUserId(), + aclresult = pg_class_aclcheck(RelationGetRelid(sequence_rel), GetUserId(), ACL_INSERT); if (aclresult != ACLCHECK_OK) aclcheck_error(aclresult, - get_relkind_objtype(sequencerel->rd_rel->relkind), - RelationGetRelationName(sequencerel)); + get_relkind_objtype(sequence_rel->rd_rel->relkind), + RelationGetRelationName(sequence_rel)); /* * COPY FROM does not honor RLS policies. That is not a problem for @@ -234,28 +232,30 @@ LogicalRepSyncSequences() * circumvent RLS. Disallow logical replication into RLS enabled * relations for such roles. */ - if (check_enable_rls(RelationGetRelid(sequencerel), InvalidOid, false) == RLS_ENABLED) + if (check_enable_rls(RelationGetRelid(sequence_rel), InvalidOid, false) == RLS_ENABLED) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"", GetUserNameFromId(GetUserId(), true), - RelationGetRelationName(sequencerel))); + RelationGetRelationName(sequence_rel))); - sequence_lsn = copy_sequence(LogRepWorkerWalRcvConn, sequencerel); + sequence_lsn = copy_sequence(LogRepWorkerWalRcvConn, sequence_rel); UpdateSubscriptionRelState(subid, seqinfo->relid, SUBREL_STATE_READY, sequence_lsn); - table_close(sequencerel, NoLock); + table_close(sequence_rel, NoLock); next_seq = curr_seq + 1; if (((next_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) || next_seq == seq_count) { /* LOG all the sequences synchronized during current batch. */ int i = curr_seq - (curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH); + for (; i <= curr_seq; i++) { SubscriptionRelState *done_seq; + done_seq = (SubscriptionRelState *) lfirst(list_nth_cell(sequences, i)); ereport(LOG, errmsg("logical replication synchronization for subscription \"%s\", sequence \"%s\" has finished", @@ -274,7 +274,7 @@ LogicalRepSyncSequences() /* * Execute the initial sync with error handling. Disable the subscription, - * if it's required. + * if required. * * Allocate the slot name in long-lived context on return. Note that we don't * handle FATAL errors which are probably because of system resource error and @@ -310,9 +310,9 @@ start_sequence_sync() PG_END_TRY(); } -/* Logical Replication Sequencesync worker entry point */ +/* Logical Replication sequence sync worker entry point */ void -SequencesyncWorkerMain(Datum main_arg) +SequenceSyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index a15b6cd..01f5a85 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -164,14 +164,14 @@ finish_sync_worker(bool istable) get_rel_name(MyLogicalRepWorker->relid))); else ereport(LOG, - errmsg("logical replication sequences synchronization worker for subscription \"%s\" has finished", + errmsg("logical replication sequence synchronization worker for subscription \"%s\" has finished", MySubscription->name)); CommitTransactionCommand(); /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - /* No need to set the failure time in case of a clean exit */ + /* No need to set the sequence failure time when it is a clean exit */ if (!istable) cancel_before_shmem_exit(logicalrep_seqsyncworker_failuretime, 0); @@ -683,13 +683,13 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * synchronization for them. * * If there is a sequence synchronization worker running already, no need to - * start a sequence synchronization in this case. The existing sequence - * sync worker will synchronize the sequences. If there are still any sequences - * to be synced after the sequence sync worker exited, then we new sequence - * sync worker can be started in the next iteration. To prevent starting the - * sequence sync worker at a high frequency after a failure, we store its last - * start time. We start the sync worker for the same relation after waiting - * at least wal_retrieve_retry_interval. + * start a new one; the existing sequence sync worker will synchronize all the + * sequences. If there are still any sequences to be synced after the sequence + * sync worker exited, then a new sequence sync worker can be started in the + * next iteration. To prevent starting the sequence sync worker at a high + * frequency after a failure, we store its last failure time. We start the sync + * worker for the same relation after waiting at least + * wal_retrieve_retry_interval. */ static void process_syncing_sequences_for_apply() @@ -702,7 +702,7 @@ process_syncing_sequences_for_apply() FetchTableStates(&started_tx); /* - * Start sequence sync worker if there is no sequence sync worker running. + * Start sequence sync worker if there is not one already. */ foreach_ptr(SubscriptionRelState, rstate, table_states_not_ready) { @@ -720,22 +720,19 @@ process_syncing_sequences_for_apply() continue; /* - * Check if there is a sequence worker running? + * Check if there is a sequence worker already running? */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); syncworker = logicalrep_sequence_sync_worker_find(MyLogicalRepWorker->subid, true); - /* - * If there is a sequence sync worker, the sequence sync worker - * will handle sync of this sequence. - */ if (syncworker) { /* Now safe to release the LWLock */ LWLockRelease(LogicalRepWorkerLock); break; } + else { /* @@ -750,13 +747,12 @@ process_syncing_sequences_for_apply() /* * If there are free sync worker slot(s), start a new sequence sync - * worker to sync the sequences and break from the loop, as this - * sequence sync worker will take care of synchronizing all the - * sequences that are in init state. + * worker, and break from the loop. */ if (nsyncworkers < max_sync_workers_per_subscription) { TimestampTz now = GetCurrentTimestamp(); + if (!MyLogicalRepWorker->sequencesync_failure_time || TimestampDifferenceExceeds(MyLogicalRepWorker->sequencesync_failure_time, now, wal_retrieve_retry_interval)) @@ -804,14 +800,13 @@ process_syncing_tables(XLogRecPtr current_lsn) break; case WORKERTYPE_APPLY: - process_syncing_sequences_for_apply(); process_syncing_tables_for_apply(current_lsn); + process_syncing_sequences_for_apply(); break; - /* Sequence sync is not expected to come here */ case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ Assert(0); - /* not reached, here to make compiler happy */ break; case WORKERTYPE_UNKNOWN: @@ -1837,7 +1832,7 @@ run_tablesync_worker() /* Logical Replication Tablesync worker entry point */ void -TablesyncWorkerMain(Datum main_arg) +TableSyncWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index d0b0715..63dff38 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -489,10 +489,9 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) (rel->state == SUBREL_STATE_SYNCDONE && rel->statelsn <= remote_final_lsn)); - /* Sequence sync is not expected to come here */ case WORKERTYPE_SEQUENCESYNC: + /* Should never happen. */ Assert(0); - /* not reached, here to make compiler happy */ break; case WORKERTYPE_UNKNOWN: @@ -4639,7 +4638,7 @@ InitializeLogRepWorker(void) get_rel_name(MyLogicalRepWorker->relid)))); else if (am_sequencesync_worker()) ereport(LOG, - (errmsg("logical replication sequences synchronization worker for subscription \"%s\" has started", + (errmsg("logical replication sequence synchronization worker for subscription \"%s\" has started", MySubscription->name))); else ereport(LOG, @@ -4689,7 +4688,7 @@ SetupApplyOrSyncWorker(int worker_slot) invalidate_syncing_table_states, (Datum) 0); - if (isSequencesyncWorker(MyLogicalRepWorker)) + if (isSequenceSyncWorker(MyLogicalRepWorker)) before_shmem_exit(logicalrep_seqsyncworker_failuretime, (Datum) 0); } diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index f380c1b..47a3326 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -18,8 +18,8 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); -extern void TablesyncWorkerMain(Datum main_arg); -extern void SequencesyncWorkerMain(Datum main_arg); +extern void TableSyncWorkerMain(Datum main_arg); +extern void SequenceSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 3701b15..502ecef 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -338,21 +338,21 @@ extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, (worker)->type == WORKERTYPE_APPLY) #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTableSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) -#define isSequencesyncWorker(worker) ((worker)->in_use && \ +#define isSequenceSyncWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_SEQUENCESYNC) static inline bool am_tablesync_worker(void) { - return isTablesyncWorker(MyLogicalRepWorker); + return isTableSyncWorker(MyLogicalRepWorker); } static inline bool am_sequencesync_worker(void) { - return isSequencesyncWorker(MyLogicalRepWorker); + return isSequenceSyncWorker(MyLogicalRepWorker); } static inline bool diff --git a/src/test/subscription/t/034_sequences.pl b/src/test/subscription/t/034_sequences.pl index 94bf83a..7272efa 100644 --- a/src/test/subscription/t/034_sequences.pl +++ b/src/test/subscription/t/034_sequences.pl @@ -1,5 +1,5 @@ -# Copyright (c) 2021, PostgreSQL Global Development Group +# Copyright (c) 2024, PostgreSQL Global Development Group # This tests that sequences are synced correctly to the subscriber use strict; @@ -13,101 +13,109 @@ my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->start; -# Create subscriber node +# Initialize subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->start; -# Create some preexisting content on publisher +# Setup structure on the publisher my $ddl = qq( CREATE TABLE seq_test (v BIGINT); - CREATE SEQUENCE s; + CREATE SEQUENCE s1; ); - -# Setup structure on the publisher $node_publisher->safe_psql('postgres', $ddl); -# Create some the same structure on subscriber, and an extra sequence that +# Setup the same structure on the subscriber, plus some extra sequences that # we'll create on the publisher later $ddl = qq( CREATE TABLE seq_test (v BIGINT); - CREATE SEQUENCE s; + CREATE SEQUENCE s1; CREATE SEQUENCE s2; CREATE SEQUENCE s3; ); - $node_subscriber->safe_psql('postgres', $ddl); -# Setup logical replication -my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; -$node_publisher->safe_psql('postgres', - "CREATE PUBLICATION seq_pub FOR ALL SEQUENCES"); - # Insert initial test data $node_publisher->safe_psql( 'postgres', qq( -- generate a number of values using the sequence - INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100); + INSERT INTO seq_test SELECT nextval('s1') FROM generate_series(1,100); )); +# Setup logical replication pub/sub +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION seq_pub FOR ALL SEQUENCES"); $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION seq_sub CONNECTION '$publisher_connstr' PUBLICATION seq_pub" ); -# Wait for initial sync to finish as well +# Wait for initial sync to finish my $synced_query = "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');"; $node_subscriber->poll_query_until('postgres', $synced_query) or die "Timed out while waiting for subscriber to synchronize data"; -# Check the data on subscriber +# +# TEST: +# +# Check the initial data on subscriber +# my $result = $node_subscriber->safe_psql( 'postgres', qq( - SELECT * FROM s; + SELECT * FROM s1; )); - is($result, '132|0|t', 'initial test data replicated'); -# create a new sequence, it should be synced +# +# TEST: +# +# ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new +# sequences of the publisher, but changes to existing sequences should +# not be synced. +# + +# create a new sequence 's2', and update existing sequence 's1' $node_publisher->safe_psql( 'postgres', qq( CREATE SEQUENCE s2; INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100); -)); -# changes to existing sequences should not be synced -$node_publisher->safe_psql( - 'postgres', qq( - INSERT INTO seq_test SELECT nextval('s') FROM generate_series(1,100); + -- Existing sequence + INSERT INTO seq_test SELECT nextval('s1') FROM generate_series(1,100); )); -# Refresh publication after create a new sequence and updating existing -# sequence. +# do ALTER SUBSCRIPTION ... REFRESH PUBLICATION $result = $node_subscriber->safe_psql( 'postgres', qq( ALTER SUBSCRIPTION seq_sub REFRESH PUBLICATION )); - $node_subscriber->poll_query_until('postgres', $synced_query) or die "Timed out while waiting for subscriber to synchronize data"; -# Check the data on subscriber +# check - existing sequence is not synced $result = $node_subscriber->safe_psql( 'postgres', qq( - SELECT * FROM s; + SELECT * FROM s1; )); +is($result, '132|0|t', 'REFRESH PUBLICATION does not sync existing sequence'); -is($result, '132|0|t', 'initial test data replicated'); - +# check - newly published sequence is synced $result = $node_subscriber->safe_psql( 'postgres', qq( SELECT * FROM s2; )); +is($result, '132|0|t', 'REFRESH PUBLICATION will sync newly published sequence'); -is($result, '132|0|t', 'initial test data replicated'); +# +# TEST: +# +# ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES should cause sync of +# new sequences of the publisher, and changes to existing sequences should +# also be synced. +# -# Changes of both new and existing sequence should be synced after REFRESH -# PUBLICATION SEQUENCES. +# create a new sequence 's3', and update the existing sequence 's2' $node_publisher->safe_psql( 'postgres', qq( CREATE SEQUENCE s3; @@ -117,8 +125,7 @@ $node_publisher->safe_psql( INSERT INTO seq_test SELECT nextval('s2') FROM generate_series(1,100); )); -# Refresh publication sequences after create new sequence and updating existing -# sequence. +# do ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES $result = $node_subscriber->safe_psql( 'postgres', qq( ALTER SUBSCRIPTION seq_sub REFRESH PUBLICATION SEQUENCES @@ -127,19 +134,23 @@ $result = $node_subscriber->safe_psql( $node_subscriber->poll_query_until('postgres', $synced_query) or die "Timed out while waiting for subscriber to synchronize data"; -# Check the data on subscriber +# check - existing sequences are syned +$result = $node_subscriber->safe_psql( + 'postgres', qq( + SELECT * FROM s1; +)); +is($result, '231|0|t', 'REFRESH PUBLICATION SEQUENCES will sync existing sequences'); $result = $node_subscriber->safe_psql( 'postgres', qq( SELECT * FROM s2; )); +is($result, '231|0|t', 'REFRESH PUBLICATION SEQUENCES will sync existing sequences'); -is($result, '231|0|t', 'initial test data replicated'); - +# check - newly published sequence is synced $result = $node_subscriber->safe_psql( 'postgres', qq( SELECT * FROM s3; )); - -is($result, '132|0|t', 'initial test data replicated'); +is($result, '132|0|t', 'REFRESH PUBLICATION SEQUENCES will sync newly published sequence'); done_testing();