Re: [HACKERS] logical decoding of two-phase transactions

From: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
To: "Peter(dot)B(dot)Smith(at)fujitsu(dot)com" <Peter(dot)B(dot)Smith(at)fujitsu(dot)com>
Cc: Ajin Cherian <itsajin(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Tom Lane <tgl(at)sss(dot)pgh(dot)pa(dot)us>, Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: [HACKERS] logical decoding of two-phase transactions
Date: 2020-10-07 10:12:04
Message-ID: CAA4eK1LrZrxgcW9UpspF2CWwH=Pm+OnGz_3MpUxzL7r2NnnTnw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Tue, Oct 6, 2020 at 10:23 AM Peter(dot)B(dot)Smith(at)fujitsu(dot)com
<Peter(dot)B(dot)Smith(at)fujitsu(dot)com> wrote:
>
>
> ==========
> Patch V6-0001, File: doc/src/sgml/logicaldecoding.sgml
> ==========
>
> COMMENT/QUESTION
> Section 48.6.1
> @ -387,6 +387,10 @@ typedef struct OutputPluginCallbacks
> LogicalDecodeTruncateCB truncate_cb;
> LogicalDecodeCommitCB commit_cb;
> LogicalDecodeMessageCB message_cb;
> + LogicalDecodeFilterPrepareCB filter_prepare_cb;
>
> Confused by the mixing of terminologies "abort" and "rollback".
> Why is it LogicalDecodeAbortPreparedCB instead of
> LogicalDecodeRollbackPreparedCB?
> Why is it abort_prepared_cb instead of rollback_prepared_cb;?
>
> I thought everything the user sees should be ROLLBACK/rollback (like
> the SQL) regardless of what the internal functions might be called.
>
> ;
>

Fair enough.

> COMMENT
> Section 48.6.1
> The begin_cb, change_cb and commit_cb callbacks are required, while
> startup_cb, filter_by_origin_cb, truncate_cb, and shutdown_cb are
> optional. If truncate_cb is not set but a TRUNCATE is to be decoded,
> the action will be ignored.
>
> The 1st paragraph beneath the typedef does not mention the newly added
> callbacks to say if they are required or optional.
>
> ;

Yeah, in code comments it was mentioned but is missed here, see the
comment "To support two phase logical decoding, we require
prepare/commit-prepare/abort-prepare callbacks. The filter-prepare
callback is optional.". I think instead of directly editing the above
paragraph we can write a new one similar to what we have done for
streaming of large in-progress transactions (Refer <para> An output
plugin may also define functions to support streaming of large,
in-progress transactions.).

>
> COMMENT
> Section 48.6.4.5
> Section 48.6.4.6
> Section 48.6.4.7
> @@ -578,6 +588,55 @@ typedef void (*LogicalDecodeCommitCB) (struct
> LogicalDecodingContext *ctx,
> </para>
> </sect3>
>
> + <sect3 id="logicaldecoding-output-plugin-prepare">
> + <sect3 id="logicaldecoding-output-plugin-commit-prepared">
> + <sect3 id="logicaldecoding-output-plugin-abort-prepared">
> +<programlisting>
>
> The wording and titles are a bit backwards compared to the others.
> e.g. previously was "Transaction Begin" (not "Begin Transaction") and
> "Transaction End" (not "End Transaction").
>
> So for consistently following the existing IMO should change these new
> titles (and wording) to:
> - "Commit Prepared Transaction Callback" --> "Transaction Commit
> Prepared Callback"
> - "Rollback Prepared Transaction Callback" --> "Transaction Rollback
> Prepared Callback"
>

makes sense.

> - "whenever a commit prepared transaction has been decoded" -->
> "whenever a transaction commit prepared has been decoded"
> - "whenever a rollback prepared transaction has been decoded." -->
> "whenever a transaction rollback prepared has been decoded."
>
> ;

I don't find above suggestions much better than current wording. How
about below instead?

"whenever we decode a transaction which is prepared for two-phase
commit is committed"
"whenever we decode a transaction which is prepared for two-phase
commit is rolled back"

Also, related to this:
+ <sect3 id="logicaldecoding-output-plugin-commit-prepared">
+ <title>Commit Prepared Transaction Callback</title>
+
+ <para>
+ The optional <function>commit_prepared_cb</function> callback
is called whenever
+ a commit prepared transaction has been decoded. The
<parameter>gid</parameter> field,
+ which is part of the <parameter>txn</parameter> parameter can
be used in this
+ callback.
+<programlisting>
+typedef void (*LogicalDecodeCommitPreparedCB) (struct
LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+</programlisting>
+ </para>
+ </sect3>
+
+ <sect3 id="logicaldecoding-output-plugin-abort-prepared">
+ <title>Rollback Prepared Transaction Callback</title>
+
+ <para>
+ The optional <function>abort_prepared_cb</function> callback is
called whenever
+ a rollback prepared transaction has been decoded. The
<parameter>gid</parameter> field,
+ which is part of the <parameter>txn</parameter> parameter can
be used in this
+ callback.
+<programlisting>

Both the above are not optional as per code and I think code is
correct. I think the documentation is wrong here.

>
> ==========
> Patch V6-0001, File: src/backend/replication/logical/decode.c
> ==========
>
> COMMENT
> Line 74
> @@ -70,6 +70,9 @@ static void DecodeCommit(LogicalDecodingContext
> *ctx, XLogRecordBuffer *buf,
> xl_xact_parsed_commit *parsed, TransactionId xid);
> static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
> xl_xact_parsed_abort *parsed, TransactionId xid);
> +static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
> + xl_xact_parsed_prepare * parsed);
>
> The 2nd line of DecodePrepare is misaligned by one space.
>
> ;

Yeah, probably pgindent is the answer. Ajin, can you please run
pgindent on all the patches?

>
> COMMENT
> Line 321
> @@ -312,17 +315,34 @@ DecodeXactOp(LogicalDecodingContext *ctx,
> XLogRecordBuffer *buf)
> }
> break;
> case XLOG_XACT_PREPARE:
> + {
> + xl_xact_parsed_prepare parsed;
> + xl_xact_prepare *xlrec;
> + /* check that output plugin is capable of twophase decoding */
>
> "twophase" --> "two-phase"
>
> ~
>
> Also, add a blank line after the declarations.
>
> ;
>
> ==========
> Patch V6-0001, File: src/backend/replication/logical/logical.c
> ==========
>
> COMMENT
> Line 249
> @@ -225,6 +237,19 @@ StartupDecodingContext(List *output_plugin_options,
> (ctx->callbacks.stream_message_cb != NULL) ||
> (ctx->callbacks.stream_truncate_cb != NULL);
>
> + /*
> + * To support two phase logical decoding, we require
> prepare/commit-prepare/abort-prepare
> + * callbacks. The filter-prepare callback is optional. We however
> enable two phase logical
> + * decoding when at least one of the methods is enabled so that we
> can easily identify
> + * missing methods.
>
> The terminology is generally well known as "two-phase" (with the
> hyphen) https://en.wikipedia.org/wiki/Two-phase_commit_protocol so
> let's be consistent for all the patch code comments. Please search the
> code and correct this in all places, even where I might have missed to
> identify it.
>
> "two phase" --> "two-phase"
>
> ;
>
> COMMENT
> Line 822
> @@ -782,6 +807,111 @@ commit_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
> }
>
> static void
> +prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
> + XLogRecPtr prepare_lsn)
>
> "support 2 phase" --> "supports two-phase" in the comment
>
> ;
>
> COMMENT
> Line 844
> Code condition seems strange and/or broken.
> if (ctx->enable_twophase && ctx->callbacks.prepare_cb == NULL)
> Because if the flag is null then this condition is skipped.
> But then if the callback was also NULL then attempting to call it to
> "do the actual work" will give NPE.
>
> ~
>
> Also, I wonder should this check be the first thing in this function?
> Because if it fails does it even make sense that all the errcallback
> code was set up?> E.g errcallback.arg potentially is left pointing to a stack variable
> on a stack that no longer exists.
>
> ;

Right, I think we should have an Assert(ctx->enable_twophase) in the
beginning and then have the check (ctx->callbacks.prepare_cb == NULL)
t its current place. Refer any of the streaming APIs (for ex.
stream_stop_cb_wrapper).

>
> COMMENT
> Line 857
> +commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
>
> "support 2 phase" --> "supports two-phase" in the comment
>
> ~
>
> Also, Same potential trouble with the condition:
> if (ctx->enable_twophase && ctx->callbacks.commit_prepared_cb == NULL)
> Same as previously asked. Should this check be first thing in this function?
>
> ;

Yeah, so the same solution as mentioned above can be used.

>
> COMMENT
> Line 892
> +abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
>
> "support 2 phase" --> "supports two-phase" in the comment
>
> ~
>
> Same potential trouble with the condition:
> if (ctx->enable_twophase && ctx->callbacks.abort_prepared_cb == NULL)
> Same as previously asked. Should this check be the first thing in this function?
>
> ;

Again the same solution can be used.

>
> COMMENT
> Line 1013
> @@ -858,6 +988,51 @@ truncate_cb_wrapper(ReorderBuffer *cache,
> ReorderBufferTXN *txn,
> error_context_stack = errcallback.previous;
> }
>
> +static bool
> +filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
> + TransactionId xid, const char *gid)
>
> Fix wording in comment:
> "twophase" --> "two-phase transactions"
> "twophase transactions" --> "two-phase transactions"
>
> ==========
> Patch V6-0001, File: src/backend/replication/logical/reorderbuffer.c
> ==========
>
> COMMENT
> Line 255
> @@ -251,7 +251,8 @@ static Size
> ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
> static void ReorderBufferRestoreChange(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
> char *change);
> static void ReorderBufferRestoreCleanup(ReorderBuffer *rb,
> ReorderBufferTXN *txn);
> -static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
> +static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
> + bool txn_prepared);
>
> The alignment is inconsistent. One more space needed before "bool txn_prepared"
>
> ;
>
> COMMENT
> Line 417
> @@ -413,6 +414,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn)
> }
>
> /* free data that's contained */
> + if (txn->gid != NULL)
> + {
> + pfree(txn->gid);
> + txn->gid = NULL;
> + }
>
> Should add the blank link before this new code, as it was before.
>
> ;
>
> COMMENT
> Line 1564
> @ -1502,12 +1561,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn)
> }
>
> /*
> - * Discard changes from a transaction (and subtransactions), after streaming
> - * them. Keep the remaining info - transactions, tuplecids, invalidations and
> - * snapshots.
> + * Discard changes from a transaction (and subtransactions), either
> after streaming or
> + * after a PREPARE.
>
> typo "snapshots.If" -> "snapshots. If"
>
> ;
>
> COMMENT/QUESTION
> Line 1590
> @@ -1526,7 +1587,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn)
> Assert(rbtxn_is_known_subxact(subtxn));
> Assert(subtxn->nsubtxns == 0);
>
> - ReorderBufferTruncateTXN(rb, subtxn);
> + ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
> }
>
> There are some code paths here I did not understand how they match the comments.
> Because this function is recursive it seems that it may be called
> where the 2nd parameter txn is a sub-transaction.
>
> But then this seems at odds with some of the other code comments of
> this function which are processing the txn without ever testing is it
> really toplevel or not:
>
> e.g. Line 1593 "/* cleanup changes in the toplevel txn */"

I think this comment is wrong but this is not the fault of this patch.

> e.g. Line 1632 "They are always stored in the toplevel transaction."
>
> ;

This seems to be correct and we probably need an Assert that the
transaction is a top-level transaction.

>
> COMMENT
> Line 1644
> @@ -1560,9 +1621,33 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn)
> * about the toplevel xact (we send the XID in all messages), but we never
> * stream XIDs of empty subxacts.
> */
> - if ((!txn->toptxn) || (txn->nentries_mem != 0))
> + if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
> txn->txn_flags |= RBTXN_IS_STREAMED;
>
> + if (txn_prepared)
>
> /* remove the change from it's containing list */
> typo "it's" --> "its"
>
> ;
>
> QUESTION
> Line 1977
> @@ -1880,7 +1965,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
> ReorderBufferChange *specinsert)
> {
> /* Discard the changes that we just streamed */
> - ReorderBufferTruncateTXN(rb, txn);
> + ReorderBufferTruncateTXN(rb, txn, false);
>
> How do you know the 3rd parameter - i.e. txn_prepared - should be
> hardwired false here?
> e.g. I thought that maybe rbtxn_prepared(txn) can be true here.
>
> ;
>
> COMMENT
> Line 2345
> @@ -2249,7 +2334,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
> break;
> }
> }
> -
> /*
>
> Looks like accidental blank line deletion. This should be put back how it was
>
> ;
>
> COMMENT/QUESTION
> Line 2374
> @@ -2278,7 +2362,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
> }
> }
> else
> - rb->commit(rb, txn, commit_lsn);
> + {
> + /*
> + * Call either PREPARE (for twophase transactions) or COMMIT
> + * (for regular ones).
>
> "twophase" --> "two-phase"
>
> ~
>
> Also, I was confused by the apparent assumption of exclusiveness of
> streaming and 2PC...
> e.g. what if streaming AND 2PC then it won't do rb->prepare()
>
> ;
>
> QUESTION
> Line 2424
> @@ -2319,11 +2412,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
> */
> if (streaming)
> {
> - ReorderBufferTruncateTXN(rb, txn);
> + ReorderBufferTruncateTXN(rb, txn, false);
>
> /* Reset the CheckXidAlive */
> CheckXidAlive = InvalidTransactionId;
> }
> + else if (rbtxn_prepared(txn))
>
> I was confused by the exclusiveness of streaming/2PC.
> e.g. what if streaming AND 2PC at same time - how can you pass false
> as 3rd param to ReorderBufferTruncateTXN?
>
> ;

Yeah, this and another handling wherever it is assumed that both can't
be true together is wrong.

>
> COMMENT
> Line 2463
> @@ -2352,17 +2451,18 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
>
> /*
> * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
> - * abort of the (sub)transaction we are streaming. We need to do the
> + * abort of the (sub)transaction we are streaming or preparing. We
> need to do the
> * cleanup and return gracefully on this error, see SetupCheckXidLive.
> */
>
> "twoi phase" --> "two-phase"
>
> ;
>
> QUESTIONS
> Line 2482
> @@ -2370,10 +2470,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb,
> ReorderBufferTXN *txn,
> errdata = NULL;
> curtxn->concurrent_abort = true;
>
> - /* Reset the TXN so that it is allowed to stream remaining data. */
> - ReorderBufferResetTXN(rb, txn, snapshot_now,
> - command_id, prev_lsn,
> - specinsert);
> + /* If streaming, reset the TXN so that it is allowed to stream
> remaining data. */
> + if (streaming)
>
> Re: /* If streaming, reset the TXN so that it is allowed to stream
> remaining data. */
> I was confused by the exclusiveness of streaming/2PC.
> Is it not possible for streaming flags and rbtxn_prepared(txn) true at
> the same time?
>

Yeah, I think it is not correct to assume that both can't be true at
the same time. But when prepared is true irrespective of whether
streaming is true or not we can use ReorderBufferTruncateTXN() API
instead of Reset API.

> ~
>
> elog(LOG, "stopping decoding of %s (%u)",
> txn->gid[0] != '\0'? txn->gid:"", txn->xid);
>
> Is this a safe operation, or do you also need to test txn->gid is not NULL?
>
> ;

I think if 'prepared' is true then we can assume it to be non-NULL,
otherwise, not.

I am responding to your email in phases so that we can have a
discussion on specific points if required and I am slightly afraid
that the email might not bounce as it happened in your case when you
sent such a long email.

--
With Regards,
Amit Kapila.

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Dilip Kumar 2020-10-07 11:30:12 Re: [HACKERS] Custom compression methods
Previous Message Amit Langote 2020-10-07 09:50:54 Re: partition routing layering in nodeModifyTable.c