Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-11-25 02:53:37
Message-ID: CAHut+PtQ9H33pe9+aw5tcqP2yfhm5k8wTxdRgnn8GZkrEV2x9A@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Here are some review comments for v51-0001.

======

.../replication/logical/applyparallelworker.c

1. General - Error messages, get_worker_name()

I previously wrote a comment to ask if the get_worker_name() should be
used in more places but the reply [1, #2b] was:

> 2b.
> Consider if maybe all of these ought to be calling get_worker_name()
> which is currently static in worker.c. Doing this means any future
> changes to get_worker_name won't cause more inconsistencies.

The most error message in applyparallelxx.c can only use "xx parallel
worker", so I think it's fine not to call get_worker_name

~

I thought the reply missed the point I was trying to make -- I meant
if it was arranged now so *every* message would go via
get_worker_name() then in future somebody wanted to change the names
(e.g. from "logical replication parallel apply worker" to "LR PA
worker") then it would only need to be changed in one central place
instead of hunting down every hardwired error message.

Anyway, you can do it how you want -- I just was not sure you'd got my
original point.

~~~

2. HandleParallelApplyMessage

+ case 'X': /* Terminate, indicating clean exit. */
+ shm_mq_detach(winfo->error_mq_handle);
+ winfo->error_mq_handle = NULL;
+ break;
+ default:
+ elog(ERROR, "unrecognized message type received from logical
replication parallel apply worker: %c (message length %d bytes)",
+ msgtype, msg->len);

The case 'X' code indentation is too much.

======

src/backend/replication/logical/origin.c

3. replorigin_session_setup(RepOriginId node, int acquired_by)

@@ -1075,12 +1075,20 @@ ReplicationOriginExitCleanup(int code, Datum arg)
* array doesn't have to be searched when calling
* replorigin_session_advance().
*
- * Obviously only one such cached origin can exist per process and the current
+ * Normally only one such cached origin can exist per process and the current
* cached value can only be set again after the previous value is torn down
* with replorigin_session_reset().
+ *
+ * However, we do allow multiple processes to point to the same origin slot if
+ * requested by the caller by passing PID of the process that has already
+ * acquired it as acquired_by. This is to allow multiple parallel apply
+ * processes to use the same origin, provided they maintain commit order, for
+ * example, by allowing only one process to commit at a time. For the first
+ * process requesting this origin, the acquired_by parameter needs to be set to
+ * 0.
*/
void
-replorigin_session_setup(RepOriginId node)
+replorigin_session_setup(RepOriginId node, int acquired_by)

I think the meaning of the acquired_by=0 is not fully described here:
"For the first process requesting this origin, the acquired_by
parameter needs to be set to 0."
IMO that seems to be describing it only from POV that you are always
going to want to allow multiple processes. But really this is an
optional feature so you might pass acquired_by=0, not just because
this is the first of multiple, but also because you *never* want to
allow multiple at all. The comment does not convey this meaning.

Maybe something worded like below is better?

SUGGESTION
Normally only one such cached origin can exist per process so the
cached value can only be set again after the previous value is torn
down with replorigin_session_reset(). For this normal case pass
acquired_by=0 (meaning the slot is not allowed to be already acquired
by another process).

However, sometimes multiple processes can safely re-use the same
origin slot (for example, multiple parallel apply processes can safely
use the same origin, provided they maintain commit order by allowing
only one process to commit at a time). For this case the first process
must pass acquired_by=0, and then the other processes sharing that
same origin can pass acquired_by=PID of the first process.

======

src/backend/replication/logical/worker.c

4. GENERAL - get_worker_name()

If you decide it is OK to hardwire some error messages instead of
unconditionally calling the get_worker_name() -- see my #1 review
comment in this post -- then there are some other messages in this
file that also seem like they can be also hardwired because the type
of worker is already known.

Here are some examples:

4a.

+ else if (am_parallel_apply_worker())
+ {
+ if (rel->state != SUBREL_STATE_READY)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ /* translator: first %s is the name of logical replication worker */
+ errmsg("%s for subscription \"%s\" will stop",
+ get_worker_name(), MySubscription->name),
+ errdetail("Cannot handle streamed replication transactions using
parallel apply workers until all tables have been synchronized.")));
+
+ return true;
+ }

In the above code from should_apply_changes_for_rel we already know
this is a parallel apply worker.

~

4b.

+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\" will stop because of a parameter change",
+ get_worker_name(), MySubscription->name)));
+ else

In the above code from maybe_reread_subscription we already know this
is a parallel apply worker.

4c.

if (am_tablesync_worker())
ereport(LOG,
- (errmsg("logical replication table synchronization worker for
subscription \"%s\", table \"%s\" has started",
- MySubscription->name, get_rel_name(MyLogicalRepWorker->relid))));
+ /* translator: first %s is the name of logical replication worker */
+ (errmsg("%s for subscription \"%s\", table \"%s\" has started",
+ get_worker_name(), MySubscription->name,
+ get_rel_name(MyLogicalRepWorker->relid))));

In the above code from ApplyWorkerMain we already know this is a
tablesync worker

~~~

5. get_transaction_apply_action

+
+/*
+ * Return the action to take for the given transaction. *winfo is assigned to
+ * the destination parallel worker info (if the action is
+ * TRANS_LEADER_SEND_TO_PARALLEL, otherwise *winfo is assigned NULL.
+ */
+static TransApplyAction
+get_transaction_apply_action(TransactionId xid,
ParallelApplyWorkerInfo **winfo)

There is no closing ')' in the function comment.

~~~

6. apply_worker_clean_exit

+ /* Notify the leader apply worker that we have exited cleanly. */
+ if (am_parallel_apply_worker())
+ pq_putmessage('X', NULL, 0);

IMO the comment would be better inside the if block

SUGGESTION
if (am_parallel_apply_worker())
{
/* Notify the leader apply worker that we have exited cleanly. */
pq_putmessage('X', NULL, 0);
}

------

[1] Hou-san's reply to my v49-0001 review.
https://www.postgresql.org/message-id/OS0PR01MB5716339FF7CB759E751492CB940D9%40OS0PR01MB5716.jpnprd01.prod.outlook.com

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Richard Guo 2022-11-25 03:00:27 Re: Bug in row_number() optimization
Previous Message Julien Rouhaud 2022-11-25 02:25:30 Re: Allow file inclusion in pg_hba and pg_ident files