RE: Perform streaming logical transactions by background workers and parallel apply

From: "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>
To: Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: RE: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-06-28 03:23:10
Message-ID: OS3PR01MB6275DCCDF35B3BBD52CA02CC9EB89@OS3PR01MB6275.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Mon, Jun 21, 2022 at 9:41 AM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> Here are some review comments for the v11-0001 patch.
>
> (I will review the remaining patches 0002-0005 and post any comments later)
>

Thanks for your comments.

> 6. doc/src/sgml/protocol.sgml
>
> Since there are protocol changes made here, shouldn’t there also be
> some corresponding LOGICALREP_PROTO_XXX constants and special checking
> added in the worker.c?

I think it is okay not to add new macro. Because we just expanded the existing
options ("streaming"). And we added a check for version in function
apply_handle_stream_abort.

> 8. doc/src/sgml/ref/create_subscription.sgml
>
> + <para>
> + If set to <literal>on</literal>, the changes of transaction are
> + written to temporary files and then applied at once after the
> + transaction is committed on the publisher.
> + </para>
>
> SUGGESTION
> If set to on, the incoming changes are written to a temporary file and
> then applied only after the transaction is committed on the publisher.

In "on" mode, there may be more than one temporary file for one streaming
transaction. (see the invocation of function BufFileCreateFileSet in function
stream_open_file and function subxact_info_write)
So I think the existing description might be better.
If you feel this sentence is not clear, I will try to improve it later.

> 10. src/backend/access/transam/xact.c
>
> @@ -1741,6 +1742,13 @@ RecordTransactionAbort(bool isSubXact)
> elog(PANIC, "cannot abort transaction %u, it was already committed",
> xid);
>
> + /*
> + * Are we using the replication origins feature? Or, in other words,
> + * are we replaying remote actions?
> + */
> + replorigin = (replorigin_session_origin != InvalidRepOriginId &&
> + replorigin_session_origin != DoNotReplicateId);
> +
> /* Fetch the data we need for the abort record */
> nrels = smgrGetPendingDeletes(false, &rels);
> nchildren = xactGetCommittedChildren(&children);
> @@ -1765,6 +1773,11 @@ RecordTransactionAbort(bool isSubXact)
> MyXactFlags, InvalidTransactionId,
> NULL);
>
> + if (replorigin)
> + /* Move LSNs forward for this replication origin */
> + replorigin_session_advance(replorigin_session_origin_lsn,
> + XactLastRecEnd);
> +
>
> I did not see any reason why the code assigning the 'replorigin' and
> the code checking the 'replorigin' are separated like they are. I
> thought these 2 new code fragments should be kept together. Perhaps it
> was decided this assignment must be outside the critical section? But
> if that’s the case maybe a comment explaining so would be good.
>
> ~~~
>
> 11. src/backend/access/transam/xact.c
>
> + if (replorigin)
> + /* Move LSNs forward for this replication origin */
> + replorigin_session_advance(replorigin_session_origin_lsn,
> +
>
> The positioning of that comment is unusual. Maybe better before the check?

As Amit-san said in [1], this is just for consistency with the code in the
function RecordTransactionCommit.

> 12. src/backend/commands/subscriptioncmds.c - defGetStreamingMode
>
> + /*
> + * If no parameter given, assume "true" is meant.
> + */
> + if (def->arg == NULL)
> + return SUBSTREAM_ON;
>
> SUGGESTION for comment
> If the streaming parameter is given but no parameter value is
> specified, then assume "true" is meant.

I think it might be better to be consistent with the function defGetBoolean
here.

> 24. .../replication/logical/applybgwroker.c - LogicalApplyBgwLoop
>
> +/* Apply Background Worker main loop */
> +static void
> +LogicalApplyBgwLoop(shm_mq_handle *mqh, volatile ApplyBgworkerShared
> *pst)
>
> Why is the name incosistent with other function names in the file?
> Should it be apply_bgworker_loop?

I think this function name would be better to be consistent with the function
LogicalRepApplyLoop.

> 28. .../replication/logical/applybgwroker.c - LogicalApplyBgwMain
>
> For consistency should it be called apply_bgworker_main?

I think this function name would be better to be consistent with the function
ApplyWorkerMain.

> 30. src/backend/replication/logical/decode.c
>
> @@ -651,9 +651,10 @@ DecodeCommit(LogicalDecodingContext *ctx,
> XLogRecordBuffer *buf,
> {
> for (i = 0; i < parsed->nsubxacts; i++)
> {
> - ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
> + ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr,
> + commit_time);
> }
> - ReorderBufferForget(ctx->reorder, xid, buf->origptr);
> + ReorderBufferForget(ctx->reorder, xid, buf->origptr, commit_time);
>
> ReorderBufferForget was declared with 'abort_time' param. So it makes
> these calls a bit confusing looking to be passing 'commit_time'
>
> Maybe better to do like below and pass 'forget_time' (inside that
> 'if') along with an explanatory comment:
>
> TimestampTz forget_time = commit_time;

I did not change this. I am just not sure how much this will help.

> 36. src/backend/replication/logical/launcher.c -
> logicalrep_apply_background_worker_count
>
> + int res = 0;
> +
>
> A better variable name here would be 'count', or even 'n'.

I think this variable name would be better to be consistent with the function
logicalrep_sync_worker_count.

> 38. src/backend/replication/logical/proto.c - logicalrep_read_stream_abort
>
> + /*
> + * If the version of the publisher is lower than the version of the
> + * subscriber, it may not support sending these two fields, so only take
> + * these fields when include_abort_lsn is true.
> + */
> + if (include_abort_lsn)
> + {
> + abort_data->abort_lsn = pq_getmsgint64(in);
> + abort_data->abort_time = pq_getmsgint64(in);
> + }
> + else
> + {
> + abort_data->abort_lsn = InvalidXLogRecPtr;
> + abort_data->abort_time = 0;
> + }
>
> This comment is documenting a decision that was made elsewhere.
>
> But it somehow feels wrong to me that the decision to read or not read
> the abort time/lsn is made by the caller of this function. IMO it
> might make more sense if the server version was simply passed as a
> param and then this function can be in control of its own destiny and
> make the decision does it need to read those extra fields or not. An
> extra member flag can be added to LogicalRepStreamAbortData to
> indicate if abort_data read these values or not.

I understand what you mean. But I am not sure if it is appropriate to introduce
version information in the file proto.c just for the STREAM_ABORT message. And
I think it might complicate the file proto.c if introducing version
information. Also, I think it might not be a good idea to add a flag to
LogicalRepStreamAbortData (There is no similar flag in structure
LogicalRep.*Data).
So, I just introduce a flag to decide whether we should read these fields from
the STREAM_ABORT message.

> 41. src/backend/replication/logical/worker.c
>
> -static ApplyErrorCallbackArg apply_error_callback_arg =
> +ApplyErrorCallbackArg apply_error_callback_arg =
> {
> .command = 0,
> .rel = NULL,
> @@ -242,7 +246,7 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
> .origin_name = NULL,
> };
>
> Maybe it is still a good idea to at least keep the old comment here:
> /* Struct for saving and restoring apply errcontext information */

I think the old comment looks like it was for the structure
ApplyErrorCallbackArg, not the variable apply_error_callback_arg.
So I did not add new comments here for variable apply_error_callback_arg.

> 42. src/backend/replication/logical/worker.c
>
> +/* check if we are applying the transaction in apply background worker */
> +#define apply_bgworker_active() (in_streamed_transaction &&
> stream_apply_worker != NULL)
>
> 42a.
> Uppercase comment.
>
> 42b.
> "in apply background worker" -> "in apply background worker"

=> 42a.
improved as suggested.
=> 42b.
Sorry, I am not sure what you mean.

> 43. src/backend/replication/logical/worker.c - handle_streamed_transaction
>
> @@ -426,41 +437,76 @@ end_replication_step(void)
> }
>
> /*
> - * Handle streamed transactions.
> + * Handle streamed transactions for both main apply worker and apply
> background
> + * worker.
> *
> - * If in streaming mode (receiving a block of streamed transaction), we
> - * simply redirect it to a file for the proper toplevel transaction.
> + * In streaming case (receiving a block of streamed transaction), for
> + * SUBSTREAM_ON mode, we simply redirect it to a file for the proper toplevel
> + * transaction, and for SUBSTREAM_APPLY mode, we send the changes to
> background
> + * apply worker (LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE
> changes will
> + * also be applied in main apply worker).
> *
> - * Returns true for streamed transactions, false otherwise (regular mode).
> + * For non-streamed transactions, returns false;
> + * For streamed transactions, returns true if in main apply worker (except we
> + * apply streamed transaction in "apply" mode and address
> + * LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE changes), false
> otherwise.
> */
>
> Maybe it is accurate (I don’t know), but this header comment seems
> excessively complicated with so many quirks about when to return
> true/false. Can it be reworded into plainer language?

Improved the comments like below:
```
* For non-streamed transactions, returns false;
* For streamed transactions, returns true if in main apply worker, false
* otherwise.
*
* But there are two exceptions: If we apply streamed transaction in main apply
* worker with parallel mode, it will return false when we address
* LOGICAL_REP_MSG_RELATION or LOGICAL_REP_MSG_TYPE changes.
```

> 46. src/backend/replication/logical/worker.c - apply_handle_stream_prepare
>
> + /*
> + * This is the main apply worker and the transaction has been
> + * serialized to file, replay all the spooled operations.
> + */
>
> SUGGESTION
> The transaction has been serialized to file. Replay all the spooled operations.

Both #46 and #54 seem to try to improve on the same comment. Personally I
prefer the improvement in #54. So improved this as suggested in #54.

> 50. src/backend/replication/logical/worker.c - apply_handle_stream_abort
>
> + /* Check whether the publisher sends abort_lsn and abort_time. */
> + if (am_apply_bgworker())
> + include_abort_lsn = MyParallelState->server_version >= 150000;
> +
> + logicalrep_read_stream_abort(s, &abort_data, include_abort_lsn);
>
> Here is where I felt maybe just the server version could be passed so
> the logicalrep_read_stream_abort could decide itself what message
> parts needed to be read. Basically it seems strange that the message
> contain parts which might not be read. I felt it is better to always
> read the whole message then later you can choose what parts you are
> interested in.

Please refer to the reply to #38.
In addition, we do not always read these two new fields from STREAM_ABORT
message. Because if the subscriber's version is higher than the publisher's
version, it may try to read data that in the invalid area.
I think this is not a correct behaviour.

> 63.
>
> I also did a quick check of all the new debug logging added. Here is
> everyhing from patch v11-0001.
>
> apply_bgworker_free:
> + elog(DEBUG1, "adding finished apply worker #%u for xid %u to the idle list",
> + wstate->pstate->n, wstate->pstate->stream_xid);
>
> LogicalApplyBgwLoop:
> + elog(DEBUG1, "[Apply BGW #%u] ended processing streaming chunk,"
> + "waiting on shm_mq_receive", pst->n);
>
> + elog(DEBUG1, "[Apply BGW #%u] exiting", pst->n);
>
> ApplyBgworkerMain:
> + elog(DEBUG1, "[Apply BGW #%u] started", pst->n);
>
> apply_bgworker_setup:
> + elog(DEBUG1, "setting up apply worker #%u",
> list_length(ApplyWorkersList) + 1);
>
> apply_bgworker_set_status:
> + elog(DEBUG1, "[Apply BGW #%u] set status to %d", MyParallelState->n,
> status);
>
> apply_bgworker_subxact_info_add:
> + elog(DEBUG1, "[Apply BGW #%u] defining savepoint %s",
> + MyParallelState->n, spname);
>
> apply_handle_stream_prepare:
> + elog(DEBUG1, "received prepare for streamed transaction %u",
> + prepare_data.xid);
>
> apply_handle_stream_start:
> + elog(DEBUG1, "starting streaming of xid %u", stream_xid);
>
> apply_handle_stream_stop:
> + elog(DEBUG1, "stopped streaming of xid %u, %u changes streamed",
> stream_xid, nchanges);
>
> apply_handle_stream_abort:
> + elog(DEBUG1, "[Apply BGW #%u] aborting current transaction xid=%u,
> subxid=%u",
> + MyParallelState->n, GetCurrentTransactionIdIfAny(),
> + GetCurrentSubTransactionId());
>
> + elog(DEBUG1, "[Apply BGW #%u] rolling back to savepoint %s",
> + MyParallelState->n, spname);
>
> apply_handle_stream_commit:
> + elog(DEBUG1, "received commit for streamed transaction %u", xid);
>
>
> Observations:
>
> 63a.
> Every new introduced message is at level DEBUG1 (not DEBUG). AFAIK
> this is OK, because the messages are all protocol related and every
> other existing debug message of the current replication worker.c was
> also at the same DEBUG1 level.
>
> 63b.
> The prefix "[Apply BGW #%u]" is used to indicate the bgworker is
> executing the code, but it does not seem to be used 100% consistently
> - e.g. there are some apply_bgworker_XXX functions not using this
> prefix. Is that OK or a mistake?

Thanks for your check. I confirm this point in v13. And there are 5 functions
do not use the prefix "[Apply BGW #%u]":
```
apply_bgworker_free
apply_bgworker_setup
apply_bgworker_send_data
apply_bgworker_wait_for
apply_bgworker_check_status
```
These 5 functions do not use this prefix because they only output logs in apply
worker. So I think it is okay.

The rest of the comments are improved as suggested.
The new patches were attached in [2].

[1] - https://www.postgresql.org/message-id/CAA4eK1J9_jcLNVqmxt_d28uGi6hAV31wjYdgmg1p8BGuEctNpw%40mail.gmail.com
[2] - https://www.postgresql.org/message-id/OS3PR01MB62758DBE8FA12BA72A43AC819EB89%40OS3PR01MB6275.jpnprd01.prod.outlook.com

Regards,
Wang wei

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message wangw.fnst@fujitsu.com 2022-06-28 03:23:55 RE: Perform streaming logical transactions by background workers and parallel apply
Previous Message wangw.fnst@fujitsu.com 2022-06-28 03:21:33 RE: Perform streaming logical transactions by background workers and parallel apply