RE: Perform streaming logical transactions by background workers and parallel apply

From: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
To: Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-12-01 10:16:58
Message-ID: OS0PR01MB57167BF64FC0891734C8E81A94149@OS0PR01MB5716.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thursday, December 1, 2022 3:58 PM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
>
> On Wed, Nov 30, 2022 at 10:51 PM houzj(dot)fnst(at)fujitsu(dot)com
> <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> >
> > On Wednesday, November 30, 2022 9:41 PM houzj(dot)fnst(at)fujitsu(dot)com
> <houzj(dot)fnst(at)fujitsu(dot)com> wrote:
> > >
> > > On Tuesday, November 29, 2022 8:34 PM Amit Kapila
> > > > Review comments on v53-0001*
> > >
> > > Attach the new version patch set.
> >
> > Sorry, there were some mistakes in the previous patch set.
> > Here is the correct V54 patch set. I also ran pgindent for the patch set.
> >
>
> Thank you for updating the patches. Here are random review comments for
> 0001 and 0002 patches.

Thanks for the comments!

>
> ereport(ERROR,
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> errmsg("logical replication parallel apply worker exited
> abnormally"),
> errcontext("%s", edata.context))); and
>
> ereport(ERROR,
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> errmsg("logical replication parallel apply worker exited
> because of subscription information change")));
>
> I'm not sure ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE is appropriate
> here. Given that parallel apply worker has already reported the error message
> with the error code, I think we don't need to set the errorcode for the logs
> from the leader process.
>
> Also, I'm not sure the term "exited abnormally" is appropriate since we use it
> when the server crashes for example. I think ERRORs reported here don't mean
> that in general.

How about reporting "xxx worker exited due to error" ?

> ---
> if (am_parallel_apply_worker() && on_subinfo_change) {
> /*
> * If a parallel apply worker exits due to the subscription
> * information change, we notify the leader apply worker so that the
> * leader can report more meaningful message in time and restart the
> * logical replication.
> */
> pq_putmessage('X', NULL, 0);
> }
>
> and
>
> ereport(ERROR,
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> errmsg("logical replication parallel apply worker exited
> because of subscription information change")));
>
> Do we really need an additional message in case of 'X'? When we call
> apply_worker_clean_exit with on_subinfo_change = true, we have reported the
> error message such as:
>
> ereport(LOG,
> (errmsg("logical replication parallel apply worker for subscription
> \"%s\" will stop because of a parameter change",
> MySubscription->name)));
>
> I think that reporting a similar message from the leader might not be
> meaningful for users.

The intention is to let leader report more meaningful message if a worker
exited due to subinfo change. Otherwise, the leader is likely to report an
error like " lost connection ... to parallel apply worker" when trying to send
data via shared memory if the worker exited. What do you think ?

> ---
> - if (options->proto.logical.streaming &&
> - PQserverVersion(conn->streamConn) >= 140000)
> - appendStringInfoString(&cmd, ", streaming 'on'");
> + if (options->proto.logical.streaming_str)
> + appendStringInfo(&cmd, ", streaming '%s'",
> +
> options->proto.logical.streaming_str);
>
> and
>
> + /*
> + * Assign the appropriate option value for streaming option
> according to
> + * the 'streaming' mode and the publisher's ability to
> support that mode.
> + */
> + if (server_version >= 160000 &&
> + MySubscription->stream == SUBSTREAM_PARALLEL)
> + {
> + options.proto.logical.streaming_str = pstrdup("parallel");
> + MyLogicalRepWorker->parallel_apply = true;
> + }
> + else if (server_version >= 140000 &&
> + MySubscription->stream != SUBSTREAM_OFF)
> + {
> + options.proto.logical.streaming_str = pstrdup("on");
> + MyLogicalRepWorker->parallel_apply = false;
> + }
> + else
> + {
> + options.proto.logical.streaming_str = NULL;
> + MyLogicalRepWorker->parallel_apply = false;
> + }
>
> This change moves the code of adjustment of the streaming option based on
> the publisher server version from libpqwalreceiver.c to worker.c.
> On the other hand, the similar logic for other parameters such as "two_phase"
> and "origin" are still done in libpqwalreceiver.c. How about passing
> MySubscription->stream via WalRcvStreamOptions and constructing a
> streaming option string in libpqrcv_startstreaming()?
> In ApplyWorkerMain(), we just need to set
> MyLogicalRepWorker->parallel_apply = true if (server_version >= 160000
> && MySubscription->stream == SUBSTREAM_PARALLEL). We won't need
> pstrdup for "parallel" and "on", and it's more consistent with other parameters.

Thanks for the suggestion. I thought about the same idea before, but it seems
we would weed to introduce " pg_subscription.h " into libpqwalreceiver.c. The
libpqwalreceiver.c looks a like a common place. So I am not sure is it looks
better to expose the detail of streaming option to it.

> ---
> + * We maintain a worker pool to avoid restarting workers for each
> + streaming
> + * transaction. We maintain each worker's information in the
>
> Do we need to describe the pool in the doc?

I thought the worker pool is kind of internal information.
Maybe we can add it later if receive some feedback about this
after pushing the main patch.

> ---
> + * in AccessExclusive mode at transaction finish commands
> + (STREAM_COMMIT and
> + * STREAM_PREAPRE) and release it immediately.
>
> typo, s/STREAM_PREAPRE/STREAM_PREPARE/

Will change.

> ---
> +/* Parallel apply workers hash table (initialized on first use). */
> +static HTAB *ParallelApplyWorkersHash = NULL;
> +
> +/*
> + * A list to maintain the active parallel apply workers. The
> +information for
> + * the new worker is added to the list after successfully launching it.
> +The
> + * list entry is removed if there are already enough workers in the
> +worker
> + * pool either at the end of the transaction or while trying to find a
> +free
> + * worker for applying the transaction. For more information about the
> +worker
> + * pool, see comments atop this file.
> + */
> +static List *ParallelApplyWorkersList = NIL;
>
> The names ParallelApplyWorkersHash and ParallelWorkersList are very similar
> but the usages are completely different. Probably we can find better names
> such as ParallelApplyTxnHash and ParallelApplyWorkerPool.
> And probably we can add more comments for ParallelApplyWorkersHash.

Will change.

> ---
> if (winfo->serialize_changes ||
> napplyworkers > (max_parallel_apply_workers_per_subscription / 2)) {
> int slot_no;
> uint16 generation;
>
> SpinLockAcquire(&winfo->shared->mutex);
> generation = winfo->shared->logicalrep_worker_generation;
> slot_no = winfo->shared->logicalrep_worker_slot_no;
> SpinLockRelease(&winfo->shared->mutex);
>
> logicalrep_pa_worker_stop(slot_no, generation);
>
> pa_free_worker_info(winfo);
>
> return true;
> }
>
> /* Unlink any files that were needed to serialize partial changes. */ if
> (winfo->serialize_changes)
> stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
>
> If winfo->serialize_changes is true, we return true in the first if statement. So
> stream_cleanup_files in the second if statement is never executed.

pa_free_worker_info will also cleanup the fileset. But I think I can move that
stream_cleanup_files before the "... napplyworkers >
(max_parallel_apply_workers_per_subscription / 2))" check so that it would be
more clear.

> ---
> + /*
> + * First, try to get a parallel apply worker from the pool,
> if available.
> + * Otherwise, try to start a new parallel apply worker.
> + */
> + winfo = pa_get_available_worker();
> + if (!winfo)
> + {
> + winfo = pa_init_and_launch_worker();
> + if (!winfo)
> + return;
> + }
>
> I think we don't necessarily need to separate two functions for getting a worker
> from the pool and launching a new worker. It seems to reduce the readability.
> Instead, I think that we can have one function that returns winfo if there is a free
> worker in the worker pool or it launches a worker. That way, we can simply do
> like:
>
> winfo = pg_launch_parallel_worker()
> if (!winfo)
> return;

Will change

> ---
> + /* Setup replication origin tracking. */
> + StartTransactionCommand();
> + ReplicationOriginNameForLogicalRep(MySubscription->oid,
> + InvalidOid,
> +
> originname, sizeof(originname));
> + originid = replorigin_by_name(originname, true);
> + if (!OidIsValid(originid))
> + originid = replorigin_create(originname);
>
> This code looks to allow parallel workers to use different origins in cases where
> the origin doesn't exist, but is that okay? Shouldn't we pass miassing_ok = false
> in this case?
>

Will change

> ---
> cfbot seems to fails:
>
> https://cirrus-ci.com/task/6264595342426112

Thanks for reporting, it's due to a testcase problem, I will fix that test soon.

Best regards,
Hou zj

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Alvaro Herrera 2022-12-01 10:49:54 Re: ExecRTCheckPerms() and many prunable partitions
Previous Message Peter Eisentraut 2022-12-01 10:02:33 initdb: Refactor PG_CMD_PUTS loops