Re: Logical Replication of sequences

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: vignesh C <vignesh21(at)gmail(dot)com>
Cc: Shlok Kyal <shlok(dot)kyal(dot)oss(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, Peter Eisentraut <peter(at)eisentraut(dot)org>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Tomas Vondra <tomas(dot)vondra(at)enterprisedb(dot)com>, Euler Taveira <euler(at)eulerto(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Hou, Zhijie/侯 志杰 <houzj(dot)fnst(at)fujitsu(dot)com>, "Jonathan S(dot) Katz" <jkatz(at)postgresql(dot)org>
Subject: Re: Logical Replication of sequences
Date: 2024-07-12 02:52:25
Message-ID: CAHut+PuDkR7thJy6JFiSYeYb5eEX+Gpcc3Qz9uc-cVj31GnswQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi Vignesh. Here are the rest of my comments for patch v20240705-0003.

(Apologies for the length of this post; but it was unavoidable due to
this being the 1st review of a very large large 1700-line patch)

======
src/backend/catalog/pg_subscription.c

1. GetSubscriptionSequences

+/*
+ * Get the sequences for the subscription.
+ *
+ * The returned list is palloc'ed in the current memory context.
+ */

Is that comment right? The palloc seems to be done in
CacheMemoryContext, not in the current context.

~

2.
The code is very similar to the other function
GetSubscriptionRelations(). In fact I did not understand how the 2
functions know what they are returning:

E.g. how does GetSubscriptionRelations not return sequences too?
E.g. how does GetSubscriptionSequences not return relations too?

======
src/backend/commands/subscriptioncmds.c

CreateSubscription:
nitpick - put the sequence logic *after* the relations logic because
that is the order that seems used everywhere else.

~~~

3. AlterSubscription_refresh

- logicalrep_worker_stop(sub->oid, relid);
+ /* Stop the worker if relation kind is not sequence*/
+ if (relkind != RELKIND_SEQUENCE)
+ logicalrep_worker_stop(sub->oid, relid);

Can you give more reasons in the comment why skip the stop for sequence worker?

~

nitpick - period and space in the comment

~~~

4.
for (off = 0; off < remove_rel_len; off++)
{
if (sub_remove_rels[off].state != SUBREL_STATE_READY &&
- sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE)
+ sub_remove_rels[off].state != SUBREL_STATE_SYNCDONE &&
+ get_rel_relkind(sub_remove_rels[off].relid) != RELKIND_SEQUENCE)
{
Would this new logic perhaps be better written as:

if (get_rel_relkind(sub_remove_rels[off].relid) == RELKIND_SEQUENCE)
continue;

~~~

AlterSubscription_refreshsequences:
nitpick - rename AlterSubscription_refresh_sequences

~
5.
There is significant code overlap between the existing
AlterSubscription_refresh and the new function
AlterSubscription_refreshsequences. I wonder if it is better to try to
combine the logic and just pass another parameter to
AlterSubscription_refresh saying to update the existing sequences if
necessary. Particularly since the AlterSubscription_refresh is already
tweaked to work for sequences. Of course, the resulting combined
function would be large and complex, but maybe that would still be
better than having giant slabs of nearly identical cut/paste code.
Thoughts?

~~~

check_publications_origin:
nitpick - move variable declarations
~~~

fetch_sequence_list:
nitpick - change /tablelist/seqlist/
nitpick - tweak the spaces of the SQL for alignment (similar to
fetch_table_list)

~

6.
+ " WHERE s.pubname IN (");
+ first = true;
+ foreach_ptr(String, pubname, publications)
+ {
+ if (first)
+ first = false;
+ else
+ appendStringInfoString(&cmd, ", ");
+
+ appendStringInfoString(&cmd, quote_literal_cstr(pubname->sval));
+ }
+ appendStringInfoChar(&cmd, ')');

IMO this can be written much better by using get_publications_str()
function to do all this list work.

======
src/backend/replication/logical/launcher.c

7. logicalrep_worker_find

/*
* Walks the workers array and searches for one that matches given
* subscription id and relid.
*
* We are only interested in the leader apply worker or table sync worker.
*/

The above function comment (not in the patch 0003) is stale because
this AFAICT this is also going to return sequence workers if it finds
one.

~~~

8. logicalrep_sequence_sync_worker_find

+/*
+ * Walks the workers array and searches for one that matches given
+ * subscription id.
+ *
+ * We are only interested in the sequence sync worker.
+ */
+LogicalRepWorker *
+logicalrep_sequence_sync_worker_find(Oid subid, bool only_running)

There are other similar functions for walking the workers array to
search for a worker. Instead of having different functions for
different cases, wouldn't it be cleaner to combine these into a single
function, where you pass a parameter (e.g. a mask of worker types that
you are interested in finding)?

~

nitpick - declare a for loop variable 'i'

~~~

9. logicalrep_apply_worker_find

+static LogicalRepWorker *
+logicalrep_apply_worker_find(Oid subid, bool only_running)

All the other find* functions assume the lock is already held
(Assert(LWLockHeldByMe(LogicalRepWorkerLock));). But this one is
different. IMO it might be better to acquire the lock in the caller to
make all the find* functions look the same. Anyway, that will help to
combine everything into 1 "find" worker as suggested in the previous
review comment #8.

~

nitpick - declare a for loop variable 'i'
nitpick - removed unnecessary parens in condition.

~~~

10. logicalrep_worker_launch

/*----------
* Sanity checks:
* - must be valid worker type
* - tablesync workers are only ones to have relid
* - parallel apply worker is the only kind of subworker
*/

The above code-comment (not in the 0003 patch) seems stale. This
should now also mention sequence sync workers, right?

~~~

11.
- Assert(is_tablesync_worker == OidIsValid(relid));
+ Assert(is_tablesync_worker == OidIsValid(relid) ||
is_sequencesync_worker == OidIsValid(relid));

IIUC there is only a single sequence sync worker for handling all the
sequences. So, what does the 'relid' actually mean here when there are
multiple sequences?

~~~

12. logicalrep_seqsyncworker_failuretime

+/*
+ * Set the sequence sync worker failure time
+ *
+ * Called on sequence sync worker failure exit.
+ */

12a.
The comment should be improved to make it more clear that the failure
time of the sync worker information is stored with the *apply* worker.
See also other review comments in this post about this area -- perhaps
all this can be removed?

~

12b.
Curious if this had to be a separate exit handler or if may this could
have been handled by the existing logicalrep_worker_onexit handler.
See also other review comments int this post about this area --
perhaps all this can be removed?

======
.../replication/logical/sequencesync.c

13. fetch_sequence_data

13a.
The function comment has no explanation of what exactly the returned
value means. It seems like it is what you will assign as 'last_value'
on the subscriber-side.

~

13b.
Some of the table functions like this are called like
'fetch_remote_table_info()'. Maybe it is better to do similar here
(e.g. include the word "remote" in the function name).

~

14.
The reason for the addition logic "(last_value + log_cnt)" is not
obvious. I am guessing it might be related to code from
'nextval_internal' (fetch = log = fetch + SEQ_LOG_VALS;) but it is
complicated. It is unfortunate that the field 'log_cnt' seems hardly
commented anywhere at all.

Also, I am not 100% sure if I trust the logic in the first place. The
caller of this function is doing:
sequence_value = fetch_sequence_data(conn, remoteid, &lsn);
/* sets the sequence with sequence_value */
SetSequenceLastValue(RelationGetRelid(rel), sequence_value);

Won't that mean you can get to a situation where subscriber-side
result of lastval('s') can be *ahead* from lastval('s') on the
publisher? That doesn't seem good.

~~~

copy_sequence:

nitpick - ERROR message. Reword "for table..." to be more like the 2nd
error message immediately below.
nitpick - /RelationGetRelationName(rel)/relname/
nitpick - moved the Assert for 'relkind' to be nearer the assignment.

~

15.
+ /*
+ * Logical replication of sequences is based on decoding WAL records,
+ * describing the "next" state of the sequence the current state in the
+ * relfilenode is yet to reach. But during the initial sync we read the
+ * current state, so we need to reconstruct the WAL record logged when we
+ * started the current batch of sequence values.
+ *
+ * Otherwise we might get duplicate values (on subscriber) if we failed
+ * over right after the sync.
+ */
+ sequence_value = fetch_sequence_data(conn, remoteid, &lsn);
+
+ /* sets the sequence with sequence_value */
+ SetSequenceLastValue(RelationGetRelid(rel), sequence_value);

(This is related to some earlier review comment #14 above). IMO all
this tricky commentary belongs in the function header of
"fetch_sequence_data", where it should be describing that function's
return value.

~~~

LogicalRepSyncSequences:
nitpick - declare void param
nitpick indentation
nitpick - wrapping
nitpick - /sequencerel/sequence_rel/
nitpick - blank lines

~

16.
+ if (check_enable_rls(RelationGetRelid(sequencerel), InvalidOid,
false) == RLS_ENABLED)
+ ereport(ERROR,
+ errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("user \"%s\" cannot replicate into relation with row-level
security enabled: \"%s\"",
+ GetUserNameFromId(GetUserId(), true),
+ RelationGetRelationName(sequencerel)));

This should be reworded to refer to sequences instead of relations. Maybe like:
user \"%s\" cannot replicate into sequence \"%s\" with row-level
security enabled"

~

17.
The Calculations involving the BATCH size seem a bit tricky.
e.g. in 1st place it is doing: (curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH == 0)
e.g. in 2nd place it is doing: (next_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0)

Maybe this batch logic can be simplified somehow using a bool variable
for the calculation?

Also, where does the number 100 come from? Why not 1000? Why not 10?
Why have batching at all? Maybe there should be some comment to
describe the reason and the chosen value.

~

18.
+ next_seq = curr_seq + 1;
+ if (((next_seq % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) || next_seq == seq_count)
+ {
+ /* LOG all the sequences synchronized during current batch. */
+ int i = curr_seq - (curr_seq % MAX_SEQUENCES_SYNC_PER_BATCH);
+ for (; i <= curr_seq; i++)
+ {
+ SubscriptionRelState *done_seq;
+ done_seq = (SubscriptionRelState *) lfirst(list_nth_cell(sequences, i));
+ ereport(LOG,
+ errmsg("logical replication synchronization for subscription \"%s\",
sequence \"%s\" has finished",
+ get_subscription_name(subid, false), get_rel_name(done_seq->relid)));
+ }
+
+ CommitTransactionCommand();
+ }
+
+ curr_seq++;

I feel this batching logic needs more comments describing what you are
doing here.

~~~

SequencesyncWorkerMain:
nitpick - spaces in the function comment

======
src/backend/replication/logical/tablesync.c

19. finish_sync_worker

-finish_sync_worker(void)
+finish_sync_worker(bool istable)

IMO, for better readability (here and in the callers) the new
parameter should be the enum LogicalRepWorkerType. Since we have that
enum, might as well make good use of it.

~

nitpick - /sequences synchronization worker/sequence synchronization worker/
nitpick - comment tweak

~

20.
+ char relkind;
+
+ if (!started_tx)
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ relkind = get_rel_relkind(rstate->relid);
+ if (relkind == RELKIND_SEQUENCE)
+ continue;

I am wondering is it possible to put the relkind check can come
*before* the TX code here, because in case there are *only* sequences
then maybe every would be skipped and there would have been no need
for any TX at all in the first place.

~~~

process_syncing_sequences_for_apply:

nitpick - fix typo and slight reword function header comment. Also
/last start time/last failure time/
nitpick - tweak comments
nitpick - blank lines

~

21.
+ if (!started_tx)
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
+
+ relkind = get_rel_relkind(rstate->relid);
+ if (relkind != RELKIND_SEQUENCE || rstate->state != SUBREL_STATE_INIT)
+ continue;

Wondering (like in review comment #20) if it is possible to swap those
because maybe there was no reason for any TX if the other condition
would always continue.

~~~

22.
+ if (nsyncworkers < max_sync_workers_per_subscription)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+ if (!MyLogicalRepWorker->sequencesync_failure_time ||
+ TimestampDifferenceExceeds(MyLogicalRepWorker->sequencesync_failure_time,
+ now, wal_retrieve_retry_interval))
+ {
+ MyLogicalRepWorker->sequencesync_failure_time = 0;

It seems to me that storing 'sequencesync_failure_time' logic may be
unnecessarily complicated. Can't the same "throttling" be achieved by
storing the synchronization worker 'start time' instead of 'fail
time', in which case then you won't have to mess around with
considering if the sync worker failed or just exited normally etc? You
might also be able to remove all the
logicalrep_seqsyncworker_failuretime() exit handler code.

~~~

process_syncing_tables:
nitpick - let's process tables before sequences (because all other
code is generally in this same order)
nitpick - removed some excessive comments about code that is not
supposed to happen

======
src/backend/replication/logical/worker.c

should_apply_changes_for_rel:
nitpick - IMO there were excessive comments for something that is not
going to happen

~~~

23. InitializeLogRepWorker

/*
* Common initialization for leader apply worker, parallel apply worker and
* tablesync worker.
*
* Initialize the database connection, in-memory subscription and necessary
* config options.
*/

That comment (not part of patch 0003) is stale; it should now mention
the sequence sync worker as well, right?

~

nitpick - Tweak plural /sequences sync worker/sequence sync worker/

~~~

24. SetupApplyOrSyncWorker

/* Common function to setup the leader apply or tablesync worker. */

That comment (not part of patch 0003) is stale; it should now mention
the sequence sync worker as well, right?

======
src/include/nodes/parsenodes.h

25.
ALTER_SUBSCRIPTION_ADD_PUBLICATION,
ALTER_SUBSCRIPTION_DROP_PUBLICATION,
ALTER_SUBSCRIPTION_REFRESH,
+ ALTER_SUBSCRIPTION_REFRESH_PUBLICATION_SEQUENCES,

For consistency with your new enum it would be better to also change
the existing enum name ALTER_SUBSCRIPTION_REFRESH ==>
ALTER_SUBSCRIPTION_REFRESH_PUBLICATION.

======
src/include/replication/logicalworker.h

nitpick - IMO should change the function name
/SequencesyncWorkerMain/SequenceSyncWorkerMain/, and in passing make
the same improvement to the TablesyncWorkerMain function name.

======
src/include/replication/worker_internal.h

26.
WORKERTYPE_PARALLEL_APPLY,
+ WORKERTYPE_SEQUENCESYNC,
} LogicalRepWorkerType;

AFAIK the enum order should not matter here so it would be better to
put the WORKERTYPE_SEQUENCESYNC directly after the
WORKERTYPE_TABLESYNC to keep the similar things together.

~

nitpick - IMO change the macro name
/isSequencesyncWorker/isSequenceSyncWorker/, and in passing make the
same improvement to the isTablesyncWorker macro name.

======
src/test/subscription/t/034_sequences.pl

nitpick - Copyright year
nitpick - Modify the "Create subscriber node" comment for consistency
nitpick - Modify comments slightly for the setup structure parts
nitpick - Add or remove various blank lines
nitpick - Since you have sequences 's2' and 's3', IMO it makes more
sense to call the original sequence 's1' instead of just 's'
nitpick - Rearrange so the CREATE PUBLICATION/SUBSCRIPTION can stay together
nitpick - Modified some comment styles to clearly delineate all the
main "TEST" scenarios
nitpick - In the REFRESH PUBLICATION test the create new sequence and
update existing can be combined (like you do in a later test).
nitpick - Changed some of the test messages for REFRESH PUBLICATION
which seemed wrong
nitpick - Added another test for 's1' in REFRESH PUBLICATION SEQUENCES
nitpick - Changed some of the test messages for REFRESH PUBLICATION
SEQUENCES which seemed wrong

~

27.
IIUC the preferred practice is to give these test object names a
'regress_' prefix.

~

28.
+# Check the data on subscriber
+$result = $node_subscriber->safe_psql(
+ 'postgres', qq(
+ SELECT * FROM s;
+));
+
+is($result, '132|0|t', 'initial test data replicated');

28a.
Maybe it is better to say "SELECT last_value, log_cnt, is_called"
instead of "SELECT *" ?
Note - this is in a couple of places.

~

28b.
Can you explain why the expected sequence value its 132, because
AFAICT you only called nextval('s') 100 times, so why isn't it 100?
My guess is that it seems to be related to code in "nextval_internal"
(fetch = log = fetch + SEQ_LOG_VALS;) but it kind of defies
expectations of the test, so if it really is correct then it needs
commentary.

Actually, I found other regression test code that deals with this:
-- log_cnt can be higher if there is a checkpoint just at the right
-- time, so just test for the expected range
SELECT last_value, log_cnt IN (31, 32) AS log_cnt_ok, is_called FROM
foo_seq_new;

Do you have to do something similar? Or is this a bug? See my other
review comments for function fetch_sequence_data in sequencesync.c

======
99.
Please also see the attached diffs patch which implements any nitpicks
mentioned above.

======
Kind Regards,
Peter Smith.
Fujitsu Australia

Attachment Content-Type Size
PS_NITPICKS_20240712_SEQ_0003.txt text/plain 26.9 KB

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message David Steele 2024-07-12 03:00:25 Re: Logging which local address was connected to in log_line_prefix
Previous Message Tender Wang 2024-07-12 02:43:58 Re: Should consider materializing the cheapest inner path in consider_parallel_nestloop()