RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-21 12:36:09
Message-ID: OS0PR01MB571680391393F3CB63469F3E940A9@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Monday, November 21, 2022 2:26 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> On Fri, Nov 18, 2022 at 6:03 PM Peter Smith <smithpb2250(at)gmail(dot)com>
> wrote:
> >
> > Here are some review comments for v47-0001
> >
> > (This review is a WIP - I will post more comments for this patch next
> > week)
> >
>
> Here are the rest of my comments for v47-0001

Thanks for the comments!

> ======
>
> doc/src/sgml/monitoring.
>
> 1.
>
> @@ -1851,6 +1851,11 @@ postgres 27093 0.0 0.0 30096 2752 ?
> Ss 11:34 0:00 postgres: ser
> <entry>Waiting to acquire an advisory user lock.</entry>
> </row>
> <row>
> + <entry><literal>applytransaction</literal></entry>
> + <entry>Waiting to acquire acquire a lock on a remote transaction being
> + applied on the subscriber side.</entry>
> + </row>
> + <row>
>
> 1a.
> Typo "acquire acquire"

Fixed.

> ~
>
> 1b.
> Maybe "on the subscriber side" does not mean much without any context.
> Maybe better to word it as below.
>
> SUGGESTION
> Waiting to acquire a lock on a remote transaction being applied by a logical
> replication subscriber.

Changed.

> ======
>
> doc/src/sgml/system-views.sgml
>
> 2.
>
> @@ -1361,8 +1361,9 @@
> <literal>virtualxid</literal>,
> <literal>spectoken</literal>,
> <literal>object</literal>,
> - <literal>userlock</literal>, or
> - <literal>advisory</literal>.
> + <literal>userlock</literal>,
> + <literal>advisory</literal> or
> + <literal>applytransaction</literal>.
>
> This change removed the Oxford comma that was there before. I assume it was
> unintended.

Changed.

> ======
>
> .../replication/logical/applyparallelworker.c
>
> 3. globals
>
> The parallel_apply_XXX functions were all shortened to pa_XXX.
>
> I wondered if the same simplification should be done also to the global
> statics...
>
> e.g.
> ParallelApplyWorkersHash -> PAWorkerHash ParallelApplyWorkersList ->
> PAWorkerList ParallelApplyMessagePending -> PAMessagePending etc...

I personally feel these names looks fine to me.

> ~~~
>
> 4. pa_get_free_worker
>
> + foreach(lc, active_workers)
> + {
> + ParallelApplyWorkerInfo *winfo = NULL;
> +
> + winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
>
> No need to assign NULL because the next line just overwrites that anyhow.

Changed.

> ~
>
> 5.
>
> + /*
> + * Try to free the worker first, because we don't wait for the rollback
> + * command to finish so the worker may not be freed at the end of the
> + * transaction.
> + */
> + if (pa_free_worker(winfo, winfo->shared->xid)) continue;
> +
> + if (!winfo->in_use)
> + return winfo;
>
> Shouldn't the (!winfo->in_use) check be done first as well -- e.g. why are we
> trying to free a worker which is maybe not even in_use?
>
> SUGGESTION (this will need some comment to explain what it is doing) if
> (!winfo->in_use || !pa_free_worker(winfo, winfo->shared->xid) &&
> !winfo->in_use)
> return winfo;

Since the pa_free_worker will check the in_use flag as well and
the current style looks clean to me. So I didn't change this.

But it seems we need to first call pa_free_worker for every worker and then
choose a free a free, otherwise a stopped worker info(shared memory or ...)
might be left for a long time. I will think about this and try to fix it in
next version.

> ~~~
>
> 6. pa_free_worker
>
> +/*
> + * Remove the parallel apply worker entry from the hash table. Stop the
> +work if
> + * there are enough workers in the pool.
> + *
>
> Typo? "work" -> "worker"
>

Fixed.

>
> 7.
>
> + /* Are there enough workers in the pool? */ if (napplyworkers >
> + (max_parallel_apply_workers_per_subscription / 2)) {
>
> IMO that comment should be something more like "Don't detach/stop the
> worker unless..."
>

Improved.

>
> 8. pa_send_data
>
> + /*
> + * Retry after 1s to reduce the cost of getting the system time and
> + * calculating the time difference.
> + */
> + (void) WaitLatch(MyLatch,
> + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, 1000L,
> + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
>
> 8a.
> I am not sure you need to explain the reason in the comment. Just saying "Wait
> before retrying." seems sufficient to me.

Changed.

> ~
>
> 8b.
> Instead of the hardwired "1s" in the comment, and 1000L in the code, maybe
> better to just have another constant.
>
> SUGGESTION
> #define SHM_SEND_RETRY_INTERVAL_MS 1000
> #define SHM_SEND_TIMEOUT_MS 10000

Changed.

> ~
>
> 9.
>
> + if (startTime == 0)
> + startTime = GetCurrentTimestamp();
> + else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
>
> IMO the initial startTime should be at top of the function otherwise the timeout
> calculation seems wrong.

Setting startTime at beginning will bring unnecessary cost if we don't need to retry.
And start counting from the first failure looks fine to me.

> ======
>
> src/backend/replication/logical/worker.c
>
> 10. handle_streamed_transaction
>
> + * In streaming case (receiving a block of streamed transaction), for
> + * SUBSTREAM_ON mode, simply redirect it to a file for the proper
> + toplevel
> + * transaction, and for SUBSTREAM_PARALLEL mode, send the changes to
> + parallel
> + * apply workers (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
> + changes
> + * will be applied by both leader apply worker and parallel apply workers).
>
> I'm not sure this function comment should be referring to SUBSTREAM_ON
> and SUBSTREAM_PARALLEL because the function body does not use those
> anywhere in the logic.

Improved.

> ~~~
>
> 11. apply_handle_stream_start
>
> + /*
> + * Increment the number of messages waiting to be processed by
> + * parallel apply worker.
> + */
> + pg_atomic_add_fetch_u32(&(winfo->shared->pending_message_count), 1);
> +
>
> The &() parens are not needed. Just write
> &winfo->shared->pending_message_count.
>
> Also, search/replace others like this -- there are a few of them.

Changed.

> ~~~
>
> 12. apply_handle_stream_stop
>
> + if (!abort_toplevel_transaction &&
> + pg_atomic_sub_fetch_u32(&(MyParallelShared->pending_message_count),
> 1)
> + == 0) { pa_lock_stream(MyParallelShared->xid, AccessShareLock);
> + pa_unlock_stream(MyParallelShared->xid, AccessShareLock); }
>
> That lock/unlock seems like it is done just as a way of testing/waiting for an
> exclusive lock held on the xid to be released.
> But the code is too tricky -- IMO it needs a big comment saying how this trick
> works, or maybe better to have a wrapper function for this for clarity. e.g.
> pa_wait_nolock_stream(xid); (or some better name)

I think the comments atop applyparallelworker.c explained the usage of
stream/transaction lock.

```
...
* In order for lmgr to detect this, we have LA acquire a session lock on the
* remote transaction (by pa_lock_stream()) and have PA wait on the lock before
* trying to receive messages. In other words, LA acquires the lock before
* sending STREAM_STOP and releases it if already acquired before sending
* STREAM_START, STREAM_ABORT(for toplevel transaction), STREAM_PREPARE and
* STREAM_COMMIT. For PA, it always needs to acquire the lock after processing
* STREAM_STOP and then release immediately after acquiring it. That way, when
* PA is waiting for LA, we can have a wait-edge from PA to LA in lmgr, which
* will make a deadlock in lmgr like:
...
```

> ~~~
>
> 13. apply_handle_stream_abort
>
> + if (abort_toplevel_transaction)
> + {
> + (void) pa_free_worker(winfo, xid);
> + }
>
> Unnecessary { }

Removed.

> ~~~
>
> 14. maybe_reread_subscription
>
> @@ -3083,8 +3563,9 @@ maybe_reread_subscription(void)
> if (!newsub)
> {
> ereport(LOG,
> - (errmsg("logical replication apply worker for subscription \"%s\" will "
> - "stop because the subscription was removed",
> + /* translator: first %s is the name of logical replication worker */
> + (errmsg("%s for subscription \"%s\" will stop because the "
> + "subscription was removed", get_worker_name(),
> MySubscription->name)));
>
> proc_exit(0);
> @@ -3094,8 +3575,9 @@ maybe_reread_subscription(void)
> if (!newsub->enabled)
> {
> ereport(LOG,
> - (errmsg("logical replication apply worker for subscription \"%s\" will "
> - "stop because the subscription was disabled",
> + /* translator: first %s is the name of logical replication worker */
> + (errmsg("%s for subscription \"%s\" will stop because the "
> + "subscription was disabled", get_worker_name(),
> MySubscription->name)));
>
> IMO better to avoid splitting the string literals over multiple line like this.
>
> Please check the rest of the patch too -- there may be many more just like this.

Changed.

> ~~~
>
> 15. ApplyWorkerMain
>
> @@ -3726,7 +4236,7 @@ ApplyWorkerMain(Datum main_arg)
> }
> else
> {
> - /* This is main apply worker */
> + /* This is leader apply worker */
> RepOriginId originid;
> "This is leader" -> "This is the leader"

Changed.

> ======
>
> src/bin/psql/describe.c
>
> 16. describeSubscriptions
>
> + if (pset.sversion >= 160000)
> + appendPQExpBuffer(&buf,
> + ", (CASE substream\n"
> + " WHEN 'f' THEN 'off'\n"
> + " WHEN 't' THEN 'on'\n"
> + " WHEN 'p' THEN 'parallel'\n"
> + " END) AS \"%s\"\n",
> + gettext_noop("Streaming"));
> + else
> + appendPQExpBuffer(&buf,
> + ", substream AS \"%s\"\n",
> + gettext_noop("Streaming"));
>
> I'm not sure it is an improvement to change the output "t/f/p" to
> "on/off/parallel"
>
> IMO "t/f/parallel" would be better. Then the t/f is consistent with
> - how it used to display, and
> - all the other boolean fields

I think the current style is consistent with the " Synchronous commit" parameter which
also shows "on/off/remote_apply/...", so didn't change this.

Name | ... | Synchronous commit
------+-----+-------------------
sub | ... | on

> ======
>
> src/include/replication/worker_internal.h
>
> 17. ParallelTransState
>
> +/*
> + * State of the transaction in parallel apply worker.
> + *
> + * These enum values are ordered by the order of transaction state
> +changes in
> + * parallel apply worker.
> + */
> +typedef enum ParallelTransState
>
> "ordered by the order" ??
>
> SUGGESTION
> The enum values must have the same order as the transaction state transitions.

Changed.

> ======
>
> src/include/storage/lock.h
>
> 18.
>
> @@ -149,10 +149,12 @@ typedef enum LockTagType
> LOCKTAG_SPECULATIVE_TOKEN, /* speculative insertion Xid and token */
> LOCKTAG_OBJECT, /* non-relation database object */
> LOCKTAG_USERLOCK, /* reserved for old contrib/userlock code */
> - LOCKTAG_ADVISORY /* advisory user locks */
> + LOCKTAG_ADVISORY, /* advisory user locks */
> LOCKTAG_APPLY_TRANSACTION
> + /* transaction being applied on the subscriber
> + * side */
> } LockTagType;
>
> -#define LOCKTAG_LAST_TYPE LOCKTAG_ADVISORY
> +#define LOCKTAG_LAST_TYPE LOCKTAG_APPLY_TRANSACTION
>
> extern PGDLLIMPORT const char *const LockTagTypeNames[];
>
> @@ -278,6 +280,17 @@ typedef struct LOCKTAG
> (locktag).locktag_type = LOCKTAG_ADVISORY, \
> (locktag).locktag_lockmethodid = USER_LOCKMETHOD)
>
> +/*
> + * ID info for a remote transaction on the subscriber side is:
> + * DB OID + SUBSCRIPTION OID + TRANSACTION ID + OBJID */ #define
> +SET_LOCKTAG_APPLY_TRANSACTION(locktag,dboid,suboid,xid,objid) \
> + ((locktag).locktag_field1 = (dboid), \
> + (locktag).locktag_field2 = (suboid), \
> + (locktag).locktag_field3 = (xid), \
> + (locktag).locktag_field4 = (objid), \
> + (locktag).locktag_type = LOCKTAG_APPLY_TRANSACTION, \
> +(locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD)
>
> Maybe "on the subscriber side" (2 places above) has no meaning here because
> there is no context this is talking about logical replication.
> Maybe those comments need to say something more like "on a logical
> replication subscriber"
>
Changed.

I also addressed all the comments from [1]

[1] https://www.postgresql.org/message-id/CAHut%2BPs7TzqqDnuH8r_ct1W_zSBCnuo3wodMt4Y8_Gw7rSRAaw%40mail.gmail.com

Best regards,
Hou zj

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Amit Langote 2022-11-21 12:46:03 Re: ExecRTCheckPerms() and many prunable partitions
Previous Message houzj.fnst@fujitsu.com 2022-11-21 12:34:35 RE: Perform streaming logical transactions by background workers and parallel apply