Re: Logical Replication of sequences

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: vignesh C <vignesh21(at)gmail(dot)com>
Cc: "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, shveta malik <shveta(dot)malik(at)gmail(dot)com>, Shlok Kyal <shlok(dot)kyal(dot)oss(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Peter Eisentraut <peter(at)eisentraut(dot)org>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Euler Taveira <euler(at)eulerto(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com>, "Jonathan S(dot) Katz" <jkatz(at)postgresql(dot)org>
Subject: Re: Logical Replication of sequences
Date: 2025-04-15 06:33:04
Message-ID: CAHut+Ptui-_0MwvFLyBAugC1Tk3UEM5rcKqk44Qq9LFbO91zQg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi Vignesh,

Some review comments for v20250525-0004.

======
Commit message

1.
A new sequencesync worker is launched as needed to synchronize sequences.
It does the following:
a) Retrieves remote values of sequences with pg_sequence_state() INIT.
b) Log a warning if the sequence parameters differ between the
publisher and subscriber.
c) Sets the local sequence values accordingly.
d) Updates the local sequence state to READY.
e) Repeat until all done; Commits synchronized sequences in batches of 100

~

/Log a warning/Logs a warning/
/Repeat until all done/Repeats until all done/

~~~

2.
1) CREATE SUBSCRIPTION
- (PG17 command syntax is unchanged)
- The subscriber retrieves sequences associated with publications.
- Published sequences are added to pg_subscription_rel with INIT state.
- Initiates the sequencesync worker (see above) to synchronize all
sequences.

~

2a.
Since PG18 is frozen now I think you can say "PG18 command syntax is unchanged"
(replace same elsewhere in this commit message)

~

2b.
/Initiates/Initiate/
(replace same elsewhere in this commit message)

======
src/backend/catalog/pg_publication.c

pg_get_publication_sequences:

3.
+Datum
+pg_get_publication_sequences(PG_FUNCTION_ARGS)
+{
+ FuncCallContext *funcctx;
+ char *pubname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ Publication *publication;
+ List *sequences = NIL;
+
+ /* stuff done only on the first call of the function */
+ if (SRF_IS_FIRSTCALL())
+ {

The 'pubname' and 'publication' variables can be declared later,
within the SRF_IS_FIRSTCALL block.

======
src/backend/commands/subscriptioncmds.c

CreateSubscription:

4.
+ /*
+ * XXX: If the subscription is for a sequence-only publication, creating
+ * this origin is unnecessary. It can be created later during the ALTER
+ * SUBSCRIPTION ... REFRESH command, if the publication is updated to
+ * include tables or tables in schemas.
+ */

Since it already says "to include tables", I didn't think you needed
to say "tables in schemas".

~~~

5.
+ *
+ * XXX: If the subscription is for a sequence-only publication,
+ * creating this slot is unnecessary. It can be created later
+ * during the ALTER SUBSCRIPTION ... REFRESH PUBLICATION or ALTER
+ * SUBSCRIPTION ... REFRESH PUBLICATION SEQUENCES command, if the
+ * publication is updated to include tables or tables in schema.
*/

(same comment as above #4).

I thought maybe it is redundant to say "or tables in schema".

~~~

AlterSubscription_refresh:

6.
+#ifdef USE_ASSERT_CHECKING
+ if (resync_all_sequences)
+ Assert(copy_data && !refresh_tables && refresh_sequences);
+#endif
+

Maybe this can have a comment like /* Sanity checks for parameter values */

~~~

7.
+ sub_remove_rels[remove_rel_len].relid = relid;
+ sub_remove_rels[remove_rel_len++].state = state;

/*
- * For READY state, we would have already dropped the
- * tablesync origin.
+ * A single sequencesync worker synchronizes all sequences, so
+ * only stop workers when relation kind is not sequence.
*/
- if (state != SUBREL_STATE_READY)
+ if (relkind != RELKIND_SEQUENCE)

Should those assignments...:
sub_remove_rels[remove_rel_len].relid = relid;
sub_remove_rels[remove_rel_len++].state = state;

...be done only inside the "if (relkind != RELKIND_SEQUENCE)". It
seems like they'll be skipped anyway in subsequent code -- see "if
(get_rel_relkind(sub_remove_rels[off].relid) == RELKIND_SEQUENCE)".
Perhaps if these assignments are moved, then the subsequent skipping
code is also not needed anymore?

======
src/backend/replication/logical/launcher.c

logicalrep_worker_launch:

8.
+ case WORKERTYPE_SEQUENCESYNC:
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "SequenceSyncWorkerMain");
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication sequencesync worker for subscription %u",
+ subid);
+ snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication sequencesync worker");
+ break;
+

Previously all these cases were in alphabetical order. Maybe you can
move this case to keep it that way.

~~~

pg_stat_get_subscription:

9.
case WORKERTYPE_TABLESYNC:
values[9] = CStringGetTextDatum("table synchronization");
break;
+ case WORKERTYPE_SEQUENCESYNC:
+ values[9] = CStringGetTextDatum("sequence synchronization");
+ break;

Previously all these cases were in alphabetical order. Maybe you can
move this case to keep it that way.

======
.../replication/logical/sequencesync.c

ProcessSyncingSequencesForApply:

10.
+ * To prevent starting the sequencesync worker at a high frequency after a
+ * failure, we store its last failure time. We start the sequencesync worker
+ * again after waiting at least wal_retrieve_retry_interval.

I felt this comment might be better inside the function where it is
doing the TimestampDifferenceExceeds check.

~~~

10.
+ if (!started_tx)
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ Assert(get_rel_relkind(rstate->relid) == RELKIND_SEQUENCE);

Maybe the Assert should come 1st before the tx stuff?

~~~

11.
+ /*
+ * If there are free sync worker slot(s), start a new sequencesync
+ * worker, and break from the loop.
+ */

Why plural? Can't you just say.

SUGGESTION:
If there is a free sync worker slot, start a new sequencesync worker,
and break from the loop.

~~~

fetch_remote_sequence_data:

12.
+ Oid tableRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID, BOOLOID,
+ LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};

Is 'tableRow' a good name for this? Calling it 'seqRow' might be better.

~~~

13.
+ seq_params_match = seqform->seqtypid == seqtypid &&
+ seqform->seqmin == seqmin && seqform->seqmax == seqmax &&
+ seqform->seqcycle == seqcycle &&
+ seqform->seqstart == seqstart &&
+ seqform->seqincrement == seqincrement;

By the time the WARNING for this mismatch gets logged, the knowledge
of *what* differed seems lost. Maybe it is not possible, but I
wondered if it would make the warning much more useful if you could
somehow also log attribute values. That will help the user understand
what caused the clash in the first place. Otherwise they will have to
go to the trouble to try to figure it out for themselves.

~~~

copy_sequence:

14.
+/*
+ * Copy existing data of a sequence from publisher.

/from/from the/

~~~

15.
+ Oid tableRow[] = {OIDOID, CHAROID};

Should this be 'seqRow' or 'relRow'?

~~~

16.
+ *sequence_mismatch = !fetch_remote_sequence_data(conn, relid, remoteid,
+ nspname, relname,
+ &seq_log_cnt, &seq_is_called,
+ &seq_page_lsn, &seq_last_value);
+
+ /* Update the sequence only if the parameters are identical. */
+ if (*sequence_mismatch == false)
+ SetSequence(RelationGetRelid(rel), seq_last_value, seq_is_called,
+ seq_log_cnt);
+
+ /* Return the LSN when the sequence state was set. */
+ return seq_page_lsn;

16a.
Is that a bug in the code? AFAICT the fetch_remote_sequence_data is
going to overwrite the new 'seq_page_lsn' even if some mismatch is
detected. Is that intentional?

~

16b.
Why not say "if (!*sequence_mismatch)"

~

16c.
Since it is not 100% clear from this code what will be the value of
seq_page_lsn when if there was a mismatch, maybe you should have a
more explicit return here:

SUGGESTION
return *sequence_mismatch ? InvalidXLogRecPtr : seq_page_lsn;

~~~

append_mismatched_sequences:

17.
+/*
+ * append_mismatched_sequences
+ *
+ * Appends details of sequences that have discrepancies between the publisher
+ * and subscriber to the mismatched_seqs string.
+ */

Hmm. It would be good if it did include sequence details, but I think
for now there are no real "details of sequences" here. just the
schemaname and seqname.

~~~

LogicalRepSyncSequences:

18.
+/*
+ * Synchronizing each sequence individually incurs overhead from starting
+ * and committing a transaction repeatedly. Additionally, we want to avoid
+ * keeping transactions open for extended periods by setting excessively
+ * high values.
+ */
+#define MAX_SEQUENCES_SYNC_PER_BATCH 100

Just saying "by setting excessively high values." doesn't really have
any context. high values of what? You have to guess what it means.

I think it is more like below.

SUGGESTION
We batch synchronize multiple sequences per transaction, because the
alternative of synchronizing each sequence individually incurs
overhead of starting and committing transactions repeatedly. On the
other hand, we want to avoid keeping this batch transaction open for
extended periods so it is currently limited to 100 sequences per
batch.

~~~

19.
+ /*
+ * In case sequence copy fails, throw a warning for the sequences that
+ * did not match before exiting.
+ */
+ PG_TRY();
+ {
+ sequence_lsn = copy_sequence(LogRepWorkerWalRcvConn, sequence_rel,
+ &sequence_mismatch);
+ }
+ PG_CATCH();
+ {
+ if (sequence_mismatch)
+ append_mismatched_sequences(mismatched_seqs, sequence_rel);
+
+ report_mismatched_sequences(mismatched_seqs);
+ PG_RE_THROW();
+ }

If we got to the CATCH then it means some ERROR happened, but at that
point I really don't think sequence_mismatch is likely to be set as
true. Maybe it is you just being extra careful, "just in case" ?

~~~

20.
+ if (mismatched_seqs->len)
+ sequence_sync_error = true;
+
+ report_mismatched_sequences(mismatched_seqs);

I think you can put that call to report_mismatched_sequences under the
same condition, because if there are no mismatches then there will be
nothing to report anyhow.

~~~

21.
+ /*
+ * Sequence synchronization failed due to a parameter mismatch. Setting
+ * the failure time to prevent repeated initiation of the sequencesync
+ * worker.
+ */

/Setting/Set/

/to prevent repeated initiation/to prevent immediate initiation/ (??)

======
src/backend/replication/logical/syncutils.c

FetchRelationStates:

22.
+ /*
+ * This is declared as static, since the same value can be used until the
+ * system table is invalidated.
+ */
static bool has_subtables = false;
/This/has_subtables/

======
src/backend/replication/logical/tablesync.c

ProcessSyncingTablesForApply:

23.
+ if (!started_tx)
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ Assert(get_rel_relkind(rstate->relid) != RELKIND_SEQUENCE);
+

Should this Assert come before the other tx code?

~~~

AllTablesyncsReady:

24.
+ bool has_tables = false;

/* We need up-to-date sync state info for subscription tables here. */
- has_subrels = FetchRelationStates(&started_tx);
-
- if (started_tx)
- {
- CommitTransactionCommand();
- pgstat_report_stat(true);
- }
+ has_tables = FetchRelationStates();

Don't need to assign has_tables to false if the value will be
immediately overwritten anyhow.

======
src/bin/pg_dump/pg_dump.c

getSubscriptionRelations:

25.
-getSubscriptionTables(Archive *fout)
+getSubscriptionRelations(Archive *fout)

Although you changed the function command and the function name for ,
there is still code within that function referring to tables. Should
that also be changed to relations?

======
src/include/commands/sequence.h

26.
+#define SEQ_LOG_CNT_INVALID 0

Zero seemed like a curious value to use as the "invalid" count. I was
wondering would it be better to define this as -1, but then in the
SetSequence function do some explicit code like below:

seq->log_cnt = log_cnt == SEQ_LOG_CNT_INVALID ? 0 : log_cnt;

======
src/test/subscription/t/036_sequences.pl

27.
+# Check the initial data on subscriber
+my $result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s1;
+));
+is($result, '100|32|t', 'initial test data replicated');

I think this deserves some explanatory comment about the magic number
32? But, it may be better to have a general comment at the top of this
TAP test to explain other magic numbers like 31 etc...

~~~

28.
+# Check - existing sequence is not synced
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s1;
+));
+is($result, '100|32|t',
+ 'REFRESH PUBLICATION does not sync existing sequence');

This test would be clearer if you also checked those same sequence
values at the publisher side, to show they are different. Don't need
to do it every time, but maybe just this first time would be good.

~~~

29.
+# Check - newly published sequence values are not updated
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT last_value, log_cnt, is_called FROM regress_s4;
+));
+is($result, '1|0|f',
+ 'REFRESH PUBLICATION will sync newly published sequence');
+

(This is the copy_data=false test)

29a.
Maybe it is good here also to show the sequence value at the publisher
to see if it is different.

~

29b.
The message 'REFRESH PUBLICATION will sync newly published sequence'
seems wrong because the values are NOT synced when copy_data=false

======
Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andrey Borodin 2025-04-15 06:47:09 Re: Call for Posters: PGConf.dev 2025
Previous Message Michael Paquier 2025-04-15 06:02:15 Re: Recent pg_rewind test failures in buildfarm