From: | Peter Smith <smithpb2250(at)gmail(dot)com> |
---|---|
To: | "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com> |
Cc: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org> |
Subject: | Re: Perform streaming logical transactions by background workers and parallel apply |
Date: | 2022-10-24 06:11:38 |
Message-ID: | CAHut+PsJWHRoRzXtMrJ1RaxmkS2LkiMR_4S2pSionxXmYsyOww@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Here are my review comments for v40-0001.
======
src/backend/replication/logical/worker.c
1. should_apply_changes_for_rel
+ else if (am_parallel_apply_worker())
+ {
+ if (rel->state != SUBREL_STATE_READY)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical replication parallel apply worker for subscription
\"%s\" will stop",
+ MySubscription->name),
+ errdetail("Cannot handle streamed replication transaction using parallel "
+ "apply workers until all tables are synchronized.")));
1a.
"transaction" -> "transactions"
1b.
"are synchronized" -> "have been synchronized."
e.g. "Cannot handle streamed replication transactions using parallel
apply workers until all tables have been synchronized."
~~~
2. maybe_reread_subscription
+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" will "
+ "stop because the subscription was removed",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was removed",
+ MySubscription->name)));
Maybe there is an easier way to code this instead of if/else and
cut/paste message text:
SUGGESTION
ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" will stop
because the subscription was removed",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));
~~~
3.
+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" will "
+ "stop because the subscription was disabled",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\" will "
+ "stop because the subscription was disabled",
+ MySubscription->name)));
These can be combined like comment #2 above
SUGGESTION
ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" will stop
because the subscription was disabled",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));
~~~
4.
+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" will stop because of a parameter change",
+ MySubscription->name)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription \"%s\"
will restart because of a parameter change",
+ MySubscription->name)));
These can be combined like comment #2 above
SUGGESTION
ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" will restart
because of a parameter change",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));
~~~~
4. InitializeApplyWorker
+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
%u will not "
+ "start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));
+ else
+ ereport(LOG,
+ (errmsg("logical replication apply worker for subscription %u will not "
+ "start because the subscription was removed during startup",
+ MyLogicalRepWorker->subid)));
These can be combined like comment #2 above
SUGGESTION
ereport(LOG,
(errmsg("logical replication %s for subscription %u will not start
because the subscription was removed during startup",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MyLogicalRepWorker->subid)));
~~~
5.
+ else if (am_parallel_apply_worker())
+ ereport(LOG,
+ (errmsg("logical replication parallel apply worker for subscription
\"%s\" has started",
+ MySubscription->name)));
else
ereport(LOG,
(errmsg("logical replication apply worker for subscription \"%s\"
has started",
MySubscription->name)));
The last if/else can be combined same as comment #2 above
SUGGESTION
else
ereport(LOG,
(errmsg("logical replication %s for subscription \"%s\" has started",
am_parallel_apply_worker() ? "parallel apply worker" : "apply worker",
MySubscription->name)));
~~~
6. IsLogicalParallelApplyWorker
+bool
+IsLogicalParallelApplyWorker(void)
+{
+ return IsLogicalWorker() && am_parallel_apply_worker();
+}
Patch v40 added the IsLogicalWorker() to the condition, but why is
that extra check necessary?
======
7. src/include/replication/worker_internal.h
+typedef struct ParallelApplyWorkerInfo
+{
+ shm_mq_handle *mq_handle;
+
+ /*
+ * The queue used to transfer messages from the parallel apply worker to
+ * the leader apply worker.
+ */
+ shm_mq_handle *error_mq_handle;
In patch v40 the comment about the NULL error_mq_handle is removed,
but since the code still explicitly set/checks NULL in different
places isn't it still better to have some comment here to describe
what NULL means?
------
Kind Regards,
Peter Smith.
Fujitsu Australia
From | Date | Subject | |
---|---|---|---|
Next Message | Japin Li | 2022-10-24 06:59:54 | Re: Question about savepoint level? |
Previous Message | Masahiko Sawada | 2022-10-24 05:53:36 | Re: [PoC] Improve dead tuple storage for lazy vacuum |