From: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> |
---|---|
To: | Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> |
Cc: | "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org> |
Subject: | Re: Perform streaming logical transactions by background workers and parallel apply |
Date: | 2022-12-08 07:42:13 |
Message-ID: | CAA4eK1+YJahJtdn+czAj8K8Pe5Y6UnidwDwi7m9xctB+xmw=-A@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
On Thu, Dec 8, 2022 at 12:42 PM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
>
> On Wed, Dec 7, 2022 at 10:03 PM houzj(dot)fnst(at)fujitsu(dot)com
> <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> >
> > > +static void
> > > +ProcessParallelApplyInterrupts(void)
> > > +{
> > > + CHECK_FOR_INTERRUPTS();
> > > +
> > > + if (ShutdownRequestPending)
> > > + {
> > > + ereport(LOG,
> > > + (errmsg("logical replication parallel
> > > apply worker for subscrip
> > > tion \"%s\" has finished",
> > > + MySubscription->name)));
> > > +
> > > + apply_worker_clean_exit(false);
> > > + }
> > > +
> > > + if (ConfigReloadPending)
> > > + {
> > > + ConfigReloadPending = false;
> > > + ProcessConfigFile(PGC_SIGHUP);
> > > + }
> > > +}
> > >
> > > I personally think that we don't need to have a function to do only
> > > these few things.
> >
> > I thought that introduce a new function make the handling of worker specific
> > Interrupts logic similar to other existing ones. Like:
> > ProcessWalRcvInterrupts () in walreceiver.c and HandlePgArchInterrupts() in
> > pgarch.c ...
>
> I think the difference from them is that there is only one place to
> call ProcessParallelApplyInterrupts().
>
But I feel it is better to isolate this code in a separate function.
What if we decide to extend it further by having some logic to stop
workers after reloading of config?
> >
> > > ---
> > > server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
> > > options.proto.logical.proto_version =
> > > + server_version >= 160000 ?
> > > LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM :
> > > server_version >= 150000 ?
> > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM :
> > > server_version >= 140000 ?
> > > LOGICALREP_PROTO_STREAM_VERSION_NUM :
> > > LOGICALREP_PROTO_VERSION_NUM;
> > >
> > > Instead of always using the new protocol version, I think we can use
> > > LOGICALREP_PROTO_TWOPHASE_VERSION_NUM if the streaming is not
> > > 'parallel'. That way, we don't need to change protocl version check
> > > logic in pgoutput.c and don't need to expose defGetStreamingMode().
> > > What do you think?
> >
> > I think that some user can also use the new version number when trying to get
> > changes (via pg_logical_slot_peek_binary_changes or other functions), so I feel
> > leave the check for new version number seems fine.
> >
> > Besides, I feel even if we don't use new version number, we still need to use
> > defGetStreamingMode to check if parallel mode in used as we need to send
> > abort_lsn when parallel is in used. I might be missing something, sorry for
> > that. Can you please explain the idea a bit ?
>
> My idea is that we use LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM if
> (server_version >= 160000 && MySubscription->stream ==
> SUBSTREAM_PARALLEL). If the stream is SUBSTREAM_ON, we use
> LOGICALREP_PROTO_TWOPHASE_VERSION_NUM even if server_version is
> 160000. That way, in pgoutput.c, we can send abort_lsn if the protocol
> version is LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM. We don't need
> to send "streaming = parallel" to the publisher since the publisher
> can decide whether or not to send abort_lsn based on the protocol
> version (still needs to send "streaming = on" though). I might be
> missing something.
>
What if we decide to send some more additional information as part of
another patch like we are discussing in the thread [1]? Now, we won't
be able to decide the version number based on just the streaming
option. Also, in such a case, even for
LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM, it may not be a good
idea to send additional abort information unless the user has used the
streaming=parallel option.
--
With Regards,
Amit Kapila.
From | Date | Subject | |
---|---|---|---|
Next Message | Amit Langote | 2022-12-08 07:59:46 | Re: Allow batched insert during cross-partition updates |
Previous Message | Etsuro Fujita | 2022-12-08 07:39:31 | Re: postgres_fdw: batch inserts vs. before row triggers |