Re: Perform streaming logical transactions by background workers and parallel apply

From: Peter Smith <smithpb2250(at)gmail(dot)com>
To: "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>
Cc: "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Perform streaming logical transactions by background workers and parallel apply
Date: 2022-05-18 07:11:04
Message-ID: CAHut+PuAxW57fowiMrn=3=53sagmehiTSW0o1Q52MpR3phUmyw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

"Here are my review comments for v6-0001.

======

1. General

I saw that now in most places you are referring to the new kind of
worker as the "apply background worker". But there are a few comments
remaining that still refer to "bgworker". Please search the entire
patch for "bgworker" in the comments and replace them with "apply
background worker".

======

2. Commit message

We also need to allow stream_stop to complete by the
apply background worker to finish it to avoid deadlocks because T-1's current
stream of changes can update rows in conflicting order with T-2's next stream
of changes.

Something is not right with this wording: "to complete by the apply
background worker to finish it...".

Maybe just omit the words "to finish it" (??).

~~~

3. Commit message

This patch also extends the subscription streaming option so that...

SUGGESTION
This patch also extends the SUBSCRIPTION 'streaming' option so that...

======

4. src/backend/commands/subscriptioncmds.c - defGetStreamingMode

+/*
+ * Extract the streaming mode value from a DefElem. This is like
+ * defGetBoolean() but also accepts the special value and "apply".
+ */
+static char
+defGetStreamingMode(DefElem *def)

Typo: "special value and..." -> "special value of..."

======

5. src/backend/replication/logical/launcher.c - logicalrep_worker_launch

+
+ if (subworker_dsm == DSM_HANDLE_INVALID)
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain");
+ else
+ snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyBgworkerMain");
+
+

5a.
This condition should be using the new 'is_subworker' bool

5b.
Double blank lines?

~~~

6. src/backend/replication/logical/launcher.c - logicalrep_worker_launch

- else
+ else if (subworker_dsm == DSM_HANDLE_INVALID)
snprintf(bgw.bgw_name, BGW_MAXLEN,
"logical replication worker for subscription %u", subid);
+ else
+ snprintf(bgw.bgw_name, BGW_MAXLEN,
+ "logical replication apply worker for subscription %u", subid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker");

This condition also should be using the new 'is_subworker' bool

~~~

7. src/backend/replication/logical/launcher.c - logicalrep_worker_stop_internal

+
+ Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+

I think there should be a comment here to say that this lock is
required/expected to be released by the caller of this function.

======

8. src/backend/replication/logical/origin.c - replorigin_session_setup

@@ -1068,7 +1068,7 @@ ReplicationOriginExitCleanup(int code, Datum arg)
* with replorigin_session_reset().
*/
void
-replorigin_session_setup(RepOriginId node)
+replorigin_session_setup(RepOriginId node, bool acquire)
{

This function has been problematic for several reviews. I saw that you
removed the previously confusing comment but I still feel some kind of
explanation is needed for the vague 'acquire' parameter. OTOH perhaps
if you just change the param name to 'must_acquire' then I think it
would be self-explanatory.

======

9. src/backend/replication/logical/worker.c - General

Some of the logs have a prefix "[Apply BGW #%u]" and some do not; I
did not really understand how you decided to prefix or not so I did
not comment about them individually. Are they all OK? Perhaps if you
can explain the reason for the choices I can review it better next
time.

~~~

10. src/backend/replication/logical/worker.c - General

There are multiple places in the code where there is code checking
if/else for bgworker or normal apply worker. And in those places,
there is often a comment like:

"If we are in main apply worker..."

But it is redundant to say "If we are" because we know we are.
Instead, those cases should say a comment at the top of the else like:

/* This is the main apply worker. */

And then the "If we are in main apply worker" text can be removed from
the comment. There are many examples in the patch like this. Please
search and modify all of them.

~~~

11. src/backend/replication/logical/worker.c - file header comment

The whole comment is similar to the commit message so any changes made
there (for #2, #3) should be made here also.

~~~

12. src/backend/replication/logical/worker.c

+typedef struct WorkerEntry
+{
+ TransactionId xid;
+ WorkerState *wstate;
+} WorkerEntry;

Missing comment for this structure

~~~

13. src/backend/replication/logical/worker.c

WorkerState
WorkerEntry

I felt that these struct names seem too generic - shouldn't they be
something more like ApplyBgworkerState, ApplyBgworkerEntry

~~~

14. src/backend/replication/logical/worker.c

+static List *ApplyWorkersIdleList = NIL;

IMO maybe ApplyWorkersFreeList is a better name than IdleList for
this. "Idle" sounds just like it is paused rather than available for
someone else to use. If you change this then please search the rest of
the patch for mentions in log messages etc

~~~

15. src/backend/replication/logical/worker.c

+static WorkerState *stream_apply_worker = NULL;
+
+/* check if we apply transaction in apply bgworker */
+#define apply_bgworker_active() (in_streamed_transaction &&
stream_apply_worker != NULL)

Wording: "if we apply transaction" -> "if we are applying the transaction"

~~~

16. src/backend/replication/logical/worker.c - handle_streamed_transaction

+ * For the main apply worker, if in streaming mode (receiving a block of
+ * streamed transaction), we send the data to the apply background worker.
+ *
+ * For the apply background worker, define a savepoint if new subtransaction
+ * was started.
*
* Returns true for streamed transactions, false otherwise (regular mode).
*/
static bool
handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)

16a.
Typo: "if new subtransaction" -> "if a new subtransaction"

16b.
That "regular mode" comment seems not quite right because IIUC it also
returns false also for a bgworker (which hardly seems like a "regular
mode")

~~~

17. src/backend/replication/logical/worker.c - handle_streamed_transaction

- /* not in streaming mode */
- if (!in_streamed_transaction)
+ /*
+ * Return if we are not in streaming mode and are not in an apply
+ * background worker.
+ */
+ if (!in_streamed_transaction && !am_apply_bgworker())
return false;

Somehow I found this condition confusing, the comment is not helpful
either because it just says exactly what the code says. Can you give a
better explanatory comment?

e.g.
Maybe the comment should be:
"Return if not in streaming mode (unless this is an apply background worker)"

e.g.
Maybe condition is easier to understand if written as:
if (!(in_streamed_transaction || am_apply_bgworker()))

~~~

18. src/backend/replication/logical/worker.c - handle_streamed_transaction

+ if (action == LOGICAL_REP_MSG_RELATION)
+ {
+ LogicalRepRelation *rel = logicalrep_read_rel(s);
+ logicalrep_relmap_update(rel);
+ }
+
+ }
+ else
+ {
+ /* Add the new subxact to the array (unless already there). */
+ subxact_info_add(current_xid);

Unnecessary blank line.

~~~

19. src/backend/replication/logical/worker.c - find_or_start_apply_bgworker

+ if (found)
+ {
+ entry->wstate->pstate->state = APPLY_BGWORKER_BUSY;
+ return entry->wstate;
+ }
+ else if (!start)
+ return NULL;
+
+ /* If there is at least one worker in the idle list, then take one. */
+ if (list_length(ApplyWorkersIdleList) > 0)

I felt that there should be a comment (after the return NULL) that says:

/*
* Start a new apply background worker
*/

~~~

20. src/backend/replication/logical/worker.c - apply_bgworker_free

+/*
+ * Add the worker to the freelist and remove the entry from hash table.
+ */
+static void
+apply_bgworker_free(WorkerState *wstate)

20a.
Typo: "freelist" -> "free list"

20b.
Elsewhere (and in the log message) this is called the idle list (but
actually I prefer "free list" like in this comment). See also comment
#14.

~~~

21. src/backend/replication/logical/worker.c - apply_bgworker_free

+ hash_search(ApplyWorkersHash, &xid,
+ HASH_REMOVE, &found);

21a.
If you are not going to check the value of ‘found’ then why bother to
pass this param at all; can’t you just pass NULL? (I think I asked the
same question in a previous review)

21b.
The wrapping over 2 lines seems unnecessary here.

~~~

22. src/backend/replication/logical/worker.c - apply_handle_stream_start

/*
- * Initialize the worker's stream_fileset if we haven't yet. This will be
- * used for the entire duration of the worker so create it in a permanent
- * context. We create this on the very first streaming message from any
- * transaction and then use it for this and other streaming transactions.
- * Now, we could create a fileset at the start of the worker as well but
- * then we won't be sure that it will ever be used.
+ * If we are in main apply worker, check if there is any free bgworker
+ * we can use to process this transaction.
*/
- if (MyLogicalRepWorker->stream_fileset == NULL)
+ stream_apply_worker = apply_bgworker_find_or_start(stream_xid, first_segment);

22a.
Typo: "in main apply worker" -> "in the main apply worker"

22b.
Since this is not if/else code, it might be better to put
Assert(!am_apply_bgworker()); above this just to make it more clear.

~~~

23. src/backend/replication/logical/worker.c - apply_handle_stream_start

+ /*
+ * If we have free worker or we already started to apply this
+ * transaction in bgworker, we pass the data to worker.
+ */

SUGGESTION
If we have found a free worker or if we are already applying this
transaction in an apply background worker, then we pass the data to
that worker.

~~~

24. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+apply_handle_stream_abort(StringInfo s)
{
- StringInfoData s2;
- int nchanges;
- char path[MAXPGPATH];
- char *buffer = NULL;
- MemoryContext oldcxt;
- BufFile *fd;
+ TransactionId xid;
+ TransactionId subxid;

- maybe_start_skipping_changes(lsn);
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM COMMIT message without STREAM STOP")));

Typo?

Shouldn't that errmsg say "STREAM ABORT message..." instead of "STREAM
COMMIT message..."

~~~

25. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+ for(i = list_length(subxactlist) - 1; i >= 0; i--)
+ {

Missing space after "for"

~~~

26. src/backend/replication/logical/worker.c - apply_handle_stream_abort

+ if (found)
+ {
+ elog(LOG, "rolled back to savepoint %s", spname);
+ RollbackToSavepoint(spname);
+ CommitTransactionCommand();
+ subxactlist = list_truncate(subxactlist, i + 1);
+ }

Does this need to log anything if nothing was found? Or is it ok to
leave as-is and silently ignore it?

~~~

27. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop

+ if (len == 0)
+ {
+ elog(LOG, "[Apply BGW #%u] got zero-length message, stopping", pst->n);
+ break;
+ }

Maybe it is unnecessary to say "stopping" because it will say that in
the next log anyway when it breaks out of the main loop.

~~~

28. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop

+ default:
+ elog(ERROR, "unexpected message");
+ break;

Perhaps the switch byte should be in a variable so then you can log
what was the unexpected byte code received. e.g. Similar to
apply_handle_tuple_routing function.

~~~

29. src/backend/replication/logical/worker.c - LogicalApplyBgwMain

+ /*
+ * The apply bgworker don't need to monopolize this replication origin
+ * which was already acquired by its leader process.
+ */
+ replorigin_session_setup(originid, false);
+ replorigin_session_origin = originid;
+ CommitTransactionCommand();

Typo: The apply bgworker don't need ..."

-> "The apply background workers don't need ..."
or -> "The apply background worker doesn't need ..."

~~~

30. src/backend/replication/logical/worker.c - apply_bgworker_setup

+/*
+ * Start apply worker background worker process and allocate shared memory for
+ * it.
+ */
+static WorkerState *
+apply_bgworker_setup(void)

Typo: "apply worker background worker process" -> "apply background
worker process"

~~~

31. src/backend/replication/logical/worker.c - apply_bgworker_wait_for

+ /* If any workers (or the postmaster) have died, we have failed. */
+ if (status == APPLY_BGWORKER_EXIT)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Background worker %u failed to apply transaction %u",
+ wstate->pstate->n, wstate->pstate->stream_xid)));

The errmsg should start with a lowercase letter.

~~~

32. src/backend/replication/logical/worker.c - check_workers_status

+ /*
+ * We don't lock here as in the worst case we will just detect the
+ * failure of worker a bit later.
+ */
+ if (wstate->pstate->state == APPLY_BGWORKER_EXIT)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("Background worker %u exited unexpectedly",
+ wstate->pstate->n)));

The errmsg should start with a lowercase letter.

~~~

33. src/backend/replication/logical/worker.c - check_workers_status

+/* Set the state of apply background worker */
+static void
+apply_bgworker_set_state(char state)

Maybe OK, or perhaps choose from one of:
- "Set the state of an apply background worker"
- "Set the apply background worker state"

======

34. src/bin/pg_dump/pg_dump.c - getSubscriptions

@@ -4450,7 +4450,7 @@ getSubscriptions(Archive *fout)
if (fout->remoteVersion >= 140000)
appendPQExpBufferStr(query, " s.substream,\n");
else
- appendPQExpBufferStr(query, " false AS substream,\n");
+ appendPQExpBufferStr(query, " 'f' AS substream,\n");

Is that logic right? Before this patch the attribute was bool; now it
is char. So doesn't there need to be some conversion/mapping here for
when you read from >= 140000 but it was still bool so you need to
convert 'false' -> 'f' and 'true' -> 't'?

======

35. src/include/replication/origin.h

@@ -53,7 +53,7 @@ extern XLogRecPtr
replorigin_get_progress(RepOriginId node, bool flush);

extern void replorigin_session_advance(XLogRecPtr remote_commit,
XLogRecPtr local_commit);
-extern void replorigin_session_setup(RepOriginId node);
+extern void replorigin_session_setup(RepOriginId node, bool acquire);

As previously suggested in comment #8 maybe the 2nd parm should be
'must_acquire'.

======

36. src/include/replication/worker_internal.h

@@ -60,6 +60,8 @@ typedef struct LogicalRepWorker
*/
FileSet *stream_fileset;

+ bool subworker;
+

Probably this new member deserves a comment.

------

Kind Regards,
Peter Smith.
Fujitsu Australia

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Julien Rouhaud 2022-05-18 07:12:45 Re: Allow file inclusion in pg_hba and pg_ident files
Previous Message Michael Paquier 2022-05-18 05:38:11 Re: Zstandard support for toast compression