From: | "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com> |
---|---|
To: | Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> |
Cc: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org> |
Subject: | RE: Perform streaming logical transactions by background workers and parallel apply |
Date: | 2022-12-07 13:03:37 |
Message-ID: | OS0PR01MB5716E527412A3481F90B4397941A9@OS0PR01MB5716.jpnprd01.prod.outlook.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
On Wednesday, December 7, 2022 7:51 PM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
>
> On Mon, Dec 5, 2022 at 1:29 PM houzj(dot)fnst(at)fujitsu(dot)com
> <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> > On Sunday, December 4, 2022 7:17 PM houzj(dot)fnst(at)fujitsu(dot)com
> <houzj(dot)fnst(at)fujitsu(dot)com>
> > >
> > > Thursday, December 1, 2022 8:40 PM Amit Kapila
> <amit(dot)kapila16(at)gmail(dot)com>
> > > wrote:
> > > > Some other comments:
> > > ...
> > > Attach the new version patch set which addressed most of the comments
> > > received so far except some comments being discussed[1].
> > > [1]
> https://www.postgresql.org/message-id/OS0PR01MB57167BF64FC0891734C
> 8E81A94149%40OS0PR01MB5716.jpnprd01.prod.outlook.com
> >
> > Attach a new version patch set which fixed a testcase failure on CFbot.
>
> Here are some comments on v56 0001, 0002 patches. Please ignore
> comments if you already incorporated them in v57.
Thanks for the comments!
> +static void
> +ProcessParallelApplyInterrupts(void)
> +{
> + CHECK_FOR_INTERRUPTS();
> +
> + if (ShutdownRequestPending)
> + {
> + ereport(LOG,
> + (errmsg("logical replication parallel
> apply worker for subscrip
> tion \"%s\" has finished",
> + MySubscription->name)));
> +
> + apply_worker_clean_exit(false);
> + }
> +
> + if (ConfigReloadPending)
> + {
> + ConfigReloadPending = false;
> + ProcessConfigFile(PGC_SIGHUP);
> + }
> +}
>
> I personally think that we don't need to have a function to do only
> these few things.
I thought that introduce a new function make the handling of worker specific
Interrupts logic similar to other existing ones. Like:
ProcessWalRcvInterrupts () in walreceiver.c and HandlePgArchInterrupts() in
pgarch.c ...
>
> Should we change the names to something like
> LOGICALREP_STREAM_PARALLEL?
Agreed, will change.
> ---
> + * The lock graph for the above example will look as follows:
> + * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
> + * acquire the lock on the remote transaction) -> LA
>
> and
>
> + * The lock graph for the above example will look as follows:
> + * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
> + * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
> + * lock) -> LA
>
> "(waiting to acquire the lock on the remote transaction)" in the first
> example and "(waiting to acquire the stream lock)" in the second
> example is the same meaning, right? If so, I think we should use
> either term for consistency.
Will change.
> ---
> + bool write_abort_info = (data->streaming ==
> SUBSTREAM_PARALLEL);
>
> I think that instead of setting write_abort_info every time when
> pgoutput_stream_abort() is called, we can set it once, probably in
> PGOutputData, at startup.
I thought that since we already have a "stream" flag in PGOutputData, I am not
sure if it would be better to introduce another flag for the same option.
> ---
> server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
> options.proto.logical.proto_version =
> + server_version >= 160000 ?
> LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
> server_version >= 150000 ?
> LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
> server_version >= 140000 ?
> LOGICALREP_PROTO_STREAM_VERSION_NUM :
> LOGICALREP_PROTO_VERSION_NUM;
>
> Instead of always using the new protocol version, I think we can use
> LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not
> 'parallel'. That way, we don't need to change protocl version check
> logic in pgoutput.c and don't need to expose defGetStreamingMode().
> What do you think?
I think that some user can also use the new version number when trying to get
changes (via pg_logical_slot_peek_binary_changes or other functions), so I feel
leave the check for new version number seems fine.
Besides, I feel even if we don't use new version number, we still need to use
defGetStreamingMode to check if parallel mode in used as we need to send
abort_lsn when parallel is in used. I might be missing something, sorry for
that. Can you please explain the idea a bit ?
> ---
> When max_parallel_apply_workers_per_subscription is changed to a value
> lower than the number of parallel worker running at that time, do we
> need to stop extra workers?
I think we can do this, like adding a check in the main loop of leader worker, and
check every time after reloading the conf. OTOH, we will also stop the worker after
finishing a transaction, so I am slightly not sure do we need to add another check logic here.
But I am fine to add it if you think it would be better.
> ---
> If a value of max_parallel_apply_workers_per_subscription is not
> sufficient, we get the LOG "out of parallel apply workers" every time
> when the apply worker doesn't launch a worker. But do we really need
> this log? It seems not consistent with
> max_sync_workers_per_subscription behavior. I think we can check if
> the number of running parallel workers is less than
> max_parallel_apply_workers_per_subscription before calling
> logicalrep_worker_launch(). What do you think?
>
> ---
> + if (server_version >= 160000 &&
> + MySubscription->stream == SUBSTREAM_PARALLEL)
> + {
> + options.proto.logical.streaming_str = pstrdup("parallel");
> + MyLogicalRepWorker->parallel_apply = true;
> + }
> + else if (server_version >= 140000 &&
> + MySubscription->stream != SUBSTREAM_OFF)
> + {
> + options.proto.logical.streaming_str = pstrdup("on");
> + MyLogicalRepWorker->parallel_apply = false;
> + }
>
> I think we don't need to use pstrdup().
Will remove.
> ---
> - BeginTransactionBlock();
> - CommitTransactionCommand(); /* Completes the preceding Begin
> command. */
> + if (!IsTransactionBlock())
> + {
> + BeginTransactionBlock();
> + CommitTransactionCommand(); /* Completes the preceding
> Begin command. */
> + }
>
> Do we need this change? In my environment, 'make check-world' passes
> without this change.
We will start a transaction block when defining the savepoint and we will get
a warning[1] if enter this function later. I think there would be some WARNs in
the log of " 022_twophase_cascade" test if we remove this check.
[1] WARN: there is already a transaction in progress"
Best regards,
Hou zj
From | Date | Subject | |
---|---|---|---|
Next Message | houzj.fnst@fujitsu.com | 2022-12-07 13:13:13 | RE: Perform streaming logical transactions by background workers and parallel apply |
Previous Message | Masahiko Sawada | 2022-12-07 12:48:39 | Re: Perform streaming logical transactions by background workers and parallel apply |