From: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> |
---|---|
To: | "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com> |
Cc: | Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org> |
Subject: | Re: Perform streaming logical transactions by background workers and parallel apply |
Date: | 2022-11-04 08:06:42 |
Message-ID: | CAA4eK1+UK0eN9hqU1JqY5WR5-YNbh6_2t8Zvd3bXpViQSE2+Rw@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
On Thu, Nov 3, 2022 at 6:36 PM houzj(dot)fnst(at)fujitsu(dot)com
<houzj(dot)fnst(at)fujitsu(dot)com> wrote:
>
> Thanks for the analysis and summary !
>
> I tried to implement the above idea and here is the patch set.
>
Few comments on v42-0001
===========================
1.
+ /*
+ * Set the xact_state flag in the leader instead of the
+ * parallel apply worker to avoid the race condition where the leader has
+ * already started waiting for the parallel apply worker to finish
+ * processing the transaction while the child process has not yet
+ * processed the first STREAM_START and has not set the
+ * xact_state to true.
+ */
+ SpinLockAcquire(&winfo->shared->mutex);
+ winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
The comments and code for xact_state doesn't seem to match.
2.
+ * progress. This could happend as we don't wait for transaction rollback
+ * to finish.
+ */
/happend/happen
3.
+/* Helper function to release a lock with lockid */
+void
+parallel_apply_lock(uint16 lockid)
...
...
+/* Helper function to take a lock with lockid */
+void
+parallel_apply_unlock(uint16 lockid)
Here, the comments seems to be reversed.
4.
+parallel_apply_lock(uint16 lockid)
+{
+ MemoryContext oldcontext;
+
+ if (list_member_int(ParallelApplyLockids, lockid))
+ return;
+
+ LockSharedObjectForSession(SubscriptionRelationId, MySubscription->oid,
+ lockid, am_leader_apply_worker() ?
+ AccessExclusiveLock:
+ AccessShareLock);
This appears odd to me because this forecloses the option the parallel
apply worker can ever acquire this lock in exclusive mode. I think it
would be better to have lock_mode as one of the parameters in this
API.
5.
+ * Inintialize fileset if not yet and open the file.
+ */
+void
+serialize_stream_start(TransactionId xid, bool first_segment)
Typo. /Inintialize/Initialize
6.
parallel_apply_setup_dsm()
{
...
+ shared->xact_state = false;
xact_state should be set with one of the values of ParallelTransState.
7.
/*
+ * Don't use SharedFileSet here because the fileset is shared by the leader
+ * worker and the fileset in leader need to survive after releasing the
+ * shared memory
This comment seems a bit unclear to me. Should there be and between
leader worker? If so, then the following 'and' won't make sense.
8.
+apply_handle_stream_stop(StringInfo s)
{
...
+ case TRANS_PARALLEL_APPLY:
+
+ /*
+ * If there is no message left, wait for the leader to release the
+ * lock and send more messages.
+ */
+ if (pg_atomic_sub_fetch_u32(&(MyParallelShared->left_message), 1) == 0)
+ parallel_apply_lock(MyParallelShared->stream_lock_id);
As per Sawada-San's email [1], this lock should be released
immediately after we acquire it. If we do so, then we don't need to
unlock separately in apply_handle_stream_start() in the below code and
at similar places in stream_prepare, stream_commit, and stream_abort.
Is there a reason for doing it differently?
apply_handle_stream_start(StringInfo s)
{
...
+ case TRANS_PARALLEL_APPLY:
...
+ /*
+ * Unlock the shared object lock so that the leader apply worker
+ * can continue to send changes.
+ */
+ parallel_apply_unlock(MyParallelShared->stream_lock_id);
9.
+parallel_apply_spooled_messages(void)
{
...
+ if (fileset_valid)
+ {
+ in_streamed_transaction = false;
+
+ parallel_apply_lock(MyParallelShared->transaction_lock_id);
Is there a reason to acquire this lock here if the parallel apply
worker will acquire it at stream_start?
10.
+ winfo->shared->stream_lock_id = parallel_apply_get_unique_id();
+ winfo->shared->transaction_lock_id = parallel_apply_get_unique_id();
Why can't we use xid (remote_xid) for one of these and local_xid (one
generated by parallel apply) for the other? I was a bit worried about
the local_xid because it will be generated only after applying the
first message but the patch already seems to be waiting for it in
parallel_apply_wait_for_xact_finish as seen in the below code.
+void
+parallel_apply_wait_for_xact_finish(ParallelApplyWorkerShared *wshared)
+{
+ /*
+ * Wait until the parallel apply worker handles the first message and
+ * set the flag to true.
+ */
+ parallel_apply_wait_for_in_xact(wshared, PARALLEL_TRANS_STARTED);
+
+ /* Wait for the transaction lock to be released. */
+ parallel_apply_lock(wshared->transaction_lock_id);
--
With Regards,
Amit Kapila.
From | Date | Subject | |
---|---|---|---|
Next Message | sirisha chamarthi | 2022-11-04 08:10:39 | Reviving lost replication slots |
Previous Message | Hayato Kuroda (Fujitsu) | 2022-11-04 07:45:18 | RE: Perform streaming logical transactions by background workers and parallel apply |