Re: Perform streaming logical transactions by background workers and parallel apply

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
Cc: Peter Smith <smithpb2250(at)gmail(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-09-26 10:58:01
Message-ID: CAA4eK1+V7toH38OybLkdAr--+G+DMv+jWHAKOYHBfmWsnaKybw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Sep 26, 2022 at 8:41 AM wangw(dot)fnst(at)fujitsu(dot)com
<wangw(dot)fnst(at)fujitsu(dot)com> wrote:
>
> On Thur, Sep 22, 2022 at 18:12 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> > 3.
> > ApplyWorkerMain()
> > {
> > ...
> > ...
> > +
> > + if (server_version >= 160000 &&
> > + MySubscription->stream == SUBSTREAM_PARALLEL)
> > + options.proto.logical.streaming = pstrdup("parallel");
> >
> > After deciding here whether the parallel streaming mode is enabled or
> > not, we recheck the same thing in apply_handle_stream_abort() and
> > parallel_apply_can_start(). In parallel_apply_can_start(), we do it
> > via two different checks. How about storing this information say in
> > structure MyLogicalRepWorker in ApplyWorkerMain() and then use it at
> > other places?
>
> Improved as suggested.
> Added a new flag "in_parallel_apply" to structure MyLogicalRepWorker.
>

Can we name the variable in_parallel_apply as parallel_apply and set
it in logicalrep_worker_launch() instead of in
ParallelApplyWorkerMain()?

Few other comments:
==================
1.
+ if (is_subworker &&
+ nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+
+ ereport(DEBUG1,
+ (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+ errmsg("out of parallel apply workers"),
+ errhint("You might need to increase
max_parallel_apply_workers_per_subscription.")));

I think it is better to keep the level of this as LOG. Similar
messages at other places use WARNING or LOG. Here, I prefer LOG
because the system can still proceed without blocking anything.

2.
+/* Reset replication origin tracking. */
+void
+parallel_apply_replorigin_reset(void)
+{
+ bool started_tx = false;
+
+ /* This function might be called inside or outside of transaction. */
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }

Why do we need a transaction in this function?

3. Few suggestions to improve in the patch:
diff --git a/src/backend/replication/logical/worker.c
b/src/backend/replication/logical/worker.c
index 1623c9e2fa..d9c519dfab 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1264,6 +1264,10 @@ apply_handle_stream_prepare(StringInfo s)
case TRANS_LEADER_SEND_TO_PARALLEL:
Assert(winfo);

+ /*
+ * The origin can be active only in one process. See
+ * apply_handle_stream_commit.
+ */
parallel_apply_replorigin_reset();

/* Send STREAM PREPARE message to the parallel apply worker. */
@@ -1623,12 +1627,7 @@ apply_handle_stream_abort(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("STREAM ABORT message without STREAM STOP")));

- /*
- * Check whether the publisher sends abort_lsn and abort_time.
- *
- * Note that the parallel apply worker is only started when the publisher
- * sends abort_lsn and abort_time.
- */
+ /* We receive abort information only when we can apply in parallel. */
if (MyLogicalRepWorker->in_parallel_apply)
read_abort_info = true;

@@ -1656,7 +1655,13 @@ apply_handle_stream_abort(StringInfo s)
Assert(winfo);

if (subxid == xid)
+ {
+ /*
+ * The origin can be active only in one process. See
+ * apply_handle_stream_commit.
+ */
parallel_apply_replorigin_reset();
+ }

/* Send STREAM ABORT message to the parallel apply worker. */
parallel_apply_send_data(winfo, s->len, s->data);
@@ -1858,6 +1863,12 @@ apply_handle_stream_commit(StringInfo s)
case TRANS_LEADER_SEND_TO_PARALLEL:
Assert(winfo);

+ /*
+ * We need to reset the replication origin before sending the commit
+ * message and set it up again after confirming that parallel worker
+ * has processed the message. This is required because origin can be
+ * active only in one process at-a-time.
+ */
parallel_apply_replorigin_reset();

/* Send STREAM COMMIT message to the parallel apply worker. */
diff --git a/src/include/replication/worker_internal.h
b/src/include/replication/worker_internal.h
index 4cbfb43492..2bd9664f86 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -70,11 +70,7 @@ typedef struct LogicalRepWorker
*/
pid_t apply_leader_pid;

- /*
- * Indicates whether to use parallel apply workers.
- *
- * Determined based on streaming parameter and publisher version.
- */
+ /* Indicates whether apply can be performed parallelly. */
bool in_parallel_apply;

--
With Regards,
Amit Kapila.

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2022-09-26 11:14:16 Re: kerberos/001_auth test fails on arm CPU darwin
Previous Message Bilal Yavuz 2022-09-26 10:45:59 kerberos/001_auth test fails on arm CPU darwin