diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 22b2a93..16c427e 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8110,9 +8110,10 @@ SCRAM-SHA-256$<iteration count>:&l This catalog only contains tables and sequences known to the subscription after running either - CREATE SUBSCRIPTION - or ALTER SUBSCRIPTION ... REFRESH - PUBLICATION or + CREATE SUBSCRIPTION or + + ALTER SUBSCRIPTION ... REFRESH PUBLICATION or + ALTER SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index bc6d18b..a1ee74b 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -490,7 +490,10 @@ HasSubscriptionRelations(Oid subid) subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); - /* If even a single tuple exists then the subscription has tables. */ + /* + * Skip sequence tuples. If even a single table tuple + * exists then the subscription has tables. + */ if (get_rel_relkind(subrel->srrelid) != RELKIND_SEQUENCE) { has_subrels = true; @@ -508,17 +511,21 @@ HasSubscriptionRelations(Oid subid) /* * Get the relations for the subscription. * - * all_relations: - * If returning sequences, if all_relations=true get all sequences, - * otherwise only get sequences that are in 'init' state. - * If returning tables, if all_relation=true get all tables, otherwise + * get_tables: get relations for tables of the subscription. + * + * get_sequences: get relations for sequences of the subscription. + * + * all_states: + * If getting tables, if all_states is true get all tables, otherwise * only get tables that have not reached 'READY' state. + * If getting sequences, if all_states is true get all sequences, + * otherwise only get sequences that are in 'init' state. * * The returned list is palloc'ed in the current memory context. */ List * GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences, - bool all_relations) + bool all_states) { List *res = NIL; Relation rel; @@ -527,6 +534,9 @@ GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences, ScanKeyData skey[2]; SysScanDesc scan; + /* One or both of 'get_tables' and 'get_sequences' must be true. */ + Assert(get_tables || get_sequences); + rel = table_open(SubscriptionRelRelationId, AccessShareLock); ScanKeyInit(&skey[nkeys++], @@ -535,13 +545,13 @@ GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences, ObjectIdGetDatum(subid)); /* Get the relations that are not in ready state */ - if (get_tables && !all_relations) + if (get_tables && !all_states) ScanKeyInit(&skey[nkeys++], Anum_pg_subscription_rel_srsubstate, BTEqualStrategyNumber, F_CHARNE, CharGetDatum(SUBREL_STATE_READY)); /* Get the sequences that are in init state */ - else if (get_sequences && !all_relations) + else if (get_sequences && !all_states) ScanKeyInit(&skey[nkeys++], Anum_pg_subscription_rel_srsubstate, BTEqualStrategyNumber, F_CHAREQ, @@ -561,11 +571,11 @@ GetSubscriptionRelations(Oid subid, bool get_tables, bool get_sequences, subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); subreltype = get_rel_relkind(subrel->srrelid); - /* If only tables were requested, skip the sequences */ + /* Skip sequences if they were not requested */ if (subreltype == RELKIND_SEQUENCE && !get_sequences) continue; - /* If only sequences were requested, skip the tables */ + /* Skip tables if they were not requested */ if (subreltype != RELKIND_SEQUENCE && !get_tables) continue; diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index e8bd53c..2e63925 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -942,11 +942,11 @@ lastval(PG_FUNCTION_ARGS) * it is the only way to clear the is_called flag in an existing * sequence. * - * logcnt is currently used only by sequence syncworker to set the log_cnt for - * sequences while synchronization of sequence values from the publisher. + * log_cnt is currently used only by the sequence syncworker to set the + * log_cnt for sequences while synchronizing values from the publisher. */ void -do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt) +do_setval(Oid relid, int64 next, bool is_called, int64 log_cnt) { SeqTable elm; Relation seqrel; @@ -997,7 +997,7 @@ do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt) (long long) minv, (long long) maxv))); /* Set the currval() state only if iscalled = true */ - if (iscalled) + if (is_called) { elm->last = next; /* last returned number */ elm->last_valid = true; @@ -1014,8 +1014,8 @@ do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt) START_CRIT_SECTION(); seq->last_value = next; /* last fetched number */ - seq->is_called = iscalled; - seq->log_cnt = (logcnt == SEQ_LOG_CNT_INVALID) ? 0: logcnt; + seq->is_called = is_called; + seq->log_cnt = (log_cnt == SEQ_LOG_CNT_INVALID) ? 0: log_cnt; MarkBufferDirty(buf); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 984f72d..1c01a2b 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -735,9 +735,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, { char *err; WalReceiverConn *wrconn; - List *tables; - ListCell *lc; - char table_state; bool must_use_password; /* Try to connect to the publisher. */ @@ -752,7 +749,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + List *tables; List *sequences; + char table_state; check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, @@ -769,9 +768,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * info. */ tables = fetch_table_list(wrconn, publications); - foreach(lc, tables) + foreach_ptr(RangeVar, rv, tables) { - RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; relid = RangeVarGetRelid(rv, AccessShareLock, false); @@ -884,9 +882,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * sequences that have been added or removed since the last subscription * creation or publication refresh. * - * If 'all_relations' is true, it will mark all objects with "init" state - * for re-synchronization; otherwise, only the newly added tables and - * sequences will be updated based on the copy_data parameter. + * If 'all_relations' is true, mark all objects with "init" state + * for re-synchronization; otherwise, only update the newly added tables and + * sequences based on the copy_data parameter. */ static void AlterSubscription_refresh(Subscription *sub, bool copy_data, @@ -984,12 +982,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; + char relkind; relid = RangeVarGetRelid(rv, AccessShareLock, false); /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + relkind = get_rel_relkind(relid); + CheckSubscriptionRelkind(relkind, rv->schemaname, rv->relname); pubrel_local_oids[off++] = relid; @@ -1001,7 +1000,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, InvalidXLogRecPtr, true); ereport(DEBUG1, (errmsg_internal("%s \"%s.%s\" added to subscription \"%s\"", - get_rel_relkind(relid) == RELKIND_SEQUENCE ? "sequence" : "table", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", rv->schemaname, rv->relname, sub->name))); } } @@ -1086,7 +1085,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, ereport(DEBUG1, (errmsg_internal("%s \"%s.%s\" removed from subscription \"%s\"", - get_rel_relkind(relid) == RELKIND_SEQUENCE ? "sequence" : "table", + relkind == RELKIND_SEQUENCE ? "sequence" : "table", get_namespace_name(get_rel_namespace(relid)), get_rel_name(relid), sub->name))); @@ -1116,6 +1115,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, */ for (off = 0; off < remove_rel_len; off++) { + /* Skip relations belonging to sequences. */ if (get_rel_relkind(sub_remove_rels[off].relid) == RELKIND_SEQUENCE) continue; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 04d76e7..5da5529 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -236,30 +236,27 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, /* * Walks the workers array and searches for one that matches given * subscription id, relid and type. - * - * We are only interested in the leader apply worker, table sync worker, or - * sequence sync worker. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType type, +logicalrep_worker_find(Oid subid, Oid relid, LogicalRepWorkerType wtype, bool only_running) { int i; LogicalRepWorker *res = NULL; + Assert(wtype == WORKERTYPE_TABLESYNC || + wtype == WORKERTYPE_SEQUENCESYNC || + wtype == WORKERTYPE_APPLY); + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - /* Search for attached worker for a given subscription id. */ + /* Search for the attached worker matching the specified criteria. */ for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - /* Skip parallel apply workers. */ - if (isParallelApplyWorker(w)) - continue; - if (w->in_use && w->subid == subid && w->relid == relid && - w->type == type && (!only_running || w->proc)) + w->type == wtype && (!only_running || w->proc)) { res = w; break; @@ -626,13 +623,13 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) * Stop the logical replication worker for subid/relid, if any. */ void -logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType type) +logicalrep_worker_stop(Oid subid, Oid relid, LogicalRepWorkerType wtype) { LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid, relid, type, false); + worker = logicalrep_worker_find(subid, relid, wtype, false); if (worker) { diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index fc36bf9..9aef45a 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -32,9 +32,11 @@ /* * fetch_remote_sequence_data * - * Retrieve the last_value, log_cnt, page_lsn and is_called of the sequence - * from the remote node. The last_value will be returned directly, while - * log_cnt, is_called and page_lsn will be provided through the output + * Retrieve sequence data (last_value, log_cnt, page_lsn and is_called) + * from the remote node. + * + * The sequence last_value will be returned directly, while + * log_cnt, is_called and page_lsn will be returned via the output * parameters log_cnt, is_called and lsn, respectively. */ static int64 @@ -46,7 +48,7 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, char *nspname, StringInfoData cmd; TupleTableSlot *slot; Oid tableRow[4] = {INT8OID, INT8OID, BOOLOID, LSNOID}; - int64 value = (Datum) 0; + int64 last_value = (Datum) 0; bool isnull; initStringInfo(&cmd); @@ -70,7 +72,7 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, char *nspname, errmsg("sequence \"%s.%s\" not found on publisher", nspname, relname))); - value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); + last_value = DatumGetInt64(slot_getattr(slot, 1, &isnull)); Assert(!isnull); *log_cnt = DatumGetInt64(slot_getattr(slot, 2, &isnull)); @@ -86,23 +88,24 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, char *nspname, walrcv_clear_result(res); - return value; + return last_value; } /* * Copy existing data of a sequence from publisher. * * Fetch the sequence value from the publisher and set the subscriber sequence - * with the retrieved value. Caller is responsible for locking the local + * with the same value. Caller is responsible for locking the local * relation. */ static XLogRecPtr copy_sequence(WalReceiverConn *conn, Relation rel) { StringInfoData cmd; - int64 sequence_value; - int64 log_cnt; - XLogRecPtr lsn = InvalidXLogRecPtr; + int64 seq_last_value; + int64 seq_log_cnt; + bool seq_is_called; + XLogRecPtr seq_lsn = InvalidXLogRecPtr; WalRcvExecResult *res; Oid tableRow[] = {OIDOID, CHAROID}; TupleTableSlot *slot; @@ -111,7 +114,6 @@ copy_sequence(WalReceiverConn *conn, Relation rel) bool isnull; char *nspname = get_namespace_name(RelationGetNamespace(rel)); char *relname = RelationGetRelationName(rel); - bool is_called; /* Fetch Oid. */ initStringInfo(&cmd); @@ -148,14 +150,14 @@ copy_sequence(WalReceiverConn *conn, Relation rel) ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); - sequence_value = fetch_remote_sequence_data(conn, remoteid, nspname, - relname, &log_cnt, &is_called, - &lsn); + seq_last_value = fetch_remote_sequence_data(conn, remoteid, nspname, + relname, &seq_log_cnt, &seq_is_called, + &seq_lsn); - do_setval(RelationGetRelid(rel), sequence_value, is_called, log_cnt); + do_setval(RelationGetRelid(rel), seq_last_value, seq_is_called, seq_log_cnt); /* return the LSN when the sequence state was set */ - return lsn; + return seq_lsn; } /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 3e162b9..6e7ed8e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -680,7 +680,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * apply process (currently, all that have state SUBREL_STATE_INIT) and manage * synchronization for them. * - * If there is a sequence synchronization worker running already, no need to + * If a sequence synchronization worker is running already, there is no need to * start a new one; the existing sequence sync worker will synchronize all the * sequences. If there are still any sequences to be synced after the sequence * sync worker exited, then a new sequence sync worker can be started in the @@ -697,7 +697,7 @@ process_syncing_sequences_for_apply() Assert(!IsTransactionState()); /* - * Start sequence sync worker if there is not one already. + * Start the sequence sync worker if needed, and there is not one already. */ foreach_ptr(SubscriptionRelState, rstate, sequence_states_not_ready) { @@ -753,6 +753,7 @@ process_syncing_sequences_for_apply() now, wal_retrieve_retry_interval)) { MyLogicalRepWorker->sequencesync_failure_time = 0; + logicalrep_worker_launch(WORKERTYPE_SEQUENCESYNC, MyLogicalRepWorker->dbid, MySubscription->oid, @@ -1689,16 +1690,14 @@ copy_table_done: /* * Common code to fetch the up-to-date sync state info into the static lists. * - * Copy tables that are not ready into table_states_not_ready and sequences - * that are not ready into sequence_states_not_ready. The pg_subscription_rel - * table is shared between sequences and tables. Because changes to either - * sequences or relations can affect the validity of relation states, we update - * both table_states_not_ready and sequence_states_not_ready simultaneously - * to ensure consistency, rather than updating them separately. Returns true if - * subscription has 1 or more tables, else false. + * Copy tables that are not READY state into table_states_not_ready, and sequences + * that have INIT state into sequence_states_not_ready. The pg_subscription_rel + * catalog is shared by tables and sequences. Changes to either sequences or + * tables can affect the validity of relation states, so we update both + * table_states_not_ready and sequence_states_not_ready simultaneously + * to ensure consistency. * - * Note: If this function started the transaction (indicated by the parameter) - * then it is the caller's responsibility to commit it. + * Returns true if subscription has 1 or more tables, else false. */ static bool FetchTableStates(void) @@ -1728,7 +1727,7 @@ FetchTableStates(void) } /* - * Fetch the tables that are in non-ready state and the sequences that + * Fetch tables that are in non-ready state, and sequences that * are in init state. */ rstates = GetSubscriptionRelations(MySubscription->oid, true, true, diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index 71d8c76..b81f496 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -62,7 +62,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt); extern void SequenceChangePersistence(Oid relid, char newrelpersistence); extern void DeleteSequenceTuple(Oid relid); extern void ResetSequence(Oid seq_relid); -extern void do_setval(Oid relid, int64 next, bool iscalled, int64 logcnt); +extern void do_setval(Oid relid, int64 next, bool is_called, int64 log_cnt); extern void ResetSequenceCaches(void); extern void seq_redo(XLogReaderState *record); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 8a12ecb..6b201d6 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -242,7 +242,7 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, - LogicalRepWorkerType type, + LogicalRepWorkerType wtype, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock);