From: | vignesh C <vignesh21(at)gmail(dot)com> |
---|---|
To: | Peter Smith <smithpb2250(at)gmail(dot)com> |
Cc: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Ajin Cherian <itsajin(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org> |
Subject: | Re: [HACKERS] logical decoding of two-phase transactions |
Date: | 2021-04-27 08:16:55 |
Message-ID: | CALDaNm0u=QGwd7jDAj-4u=7vvPn5rarFjBMCgfiJbDte55CWAA@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
On Wed, Apr 21, 2021 at 12:13 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> On Tue, Apr 20, 2021 at 3:45 PM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
> >
> > Please find attached the latest patch set v73`*
> >
> > Differences from v72* are:
> >
> > * Rebased to HEAD @ today (required because v72-0001 no longer applied cleanly)
> >
> > * Minor documentation correction for protocol messages for Commit Prepared ('K')
> >
> > * Non-functional code tidy (mostly proto.c) to reduce overloading
> > different meanings to same member names for prepare/commit times.
>
>
> Please find attached a re-posting of patch set v73*
>
> This is the same as yesterday's v73 but with a contrib module compile
> error fixed.
Few comments on
v73-0002-Add-prepare-API-support-for-streaming-transactio.patch patch:
1) There are slight differences in error message in case of Alter
subscription ... drop publication, we can keep the error message
similar:
postgres=# ALTER SUBSCRIPTION mysub drop PUBLICATION mypub WITH
(refresh = false, copy_data=true, two_phase=true);
ERROR: unrecognized subscription parameter: "copy_data"
postgres=# ALTER SUBSCRIPTION mysub drop PUBLICATION mypub WITH
(refresh = false, two_phase=true, streaming=true);
ERROR: cannot alter two_phase option
2) We are sending txn->xid twice, I felt we should send only once in
logicalrep_write_stream_prepare:
+ /* transaction ID */
+ Assert(TransactionIdIsValid(txn->xid));
+ pq_sendint32(out, txn->xid);
+
+ /* send the flags field */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, prepare_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->u_op_time.prepare_time);
+ pq_sendint32(out, txn->xid);
+
3) We could remove xid and return prepare_data->xid
+TransactionId
+logicalrep_read_stream_prepare(StringInfo in,
LogicalRepPreparedTxnData *prepare_data)
+{
+ TransactionId xid;
+ uint8 flags;
+
+ xid = pq_getmsgint(in, 4);
4) Here comments can be above apply_spooled_messages for better readability
+ /*
+ * 1. Replay all the spooled operations - Similar code as for
+ * apply_handle_stream_commit (i.e. non two-phase stream commit)
+ */
+
+ ensure_transaction();
+
+ nchanges = apply_spooled_messages(xid, prepare_data.prepare_lsn);
+
5) Similarly this below comment can be above PrepareTransactionBlock
+ /*
+ * 2. Mark the transaction as prepared. - Similar code as for
+ * apply_handle_prepare (i.e. two-phase non-streamed prepare)
+ */
+
+ /*
+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+ * called within the PrepareTransactionBlock below.
+ */
+ BeginTransactionBlock();
+ CommitTransactionCommand();
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data.end_lsn;
+ replorigin_session_origin_timestamp = prepare_data.prepare_time;
+
+ PrepareTransactionBlock(gid);
+ CommitTransactionCommand();
+
+ pgstat_report_stat(false);
6) There is a lot of common code between apply_handle_stream_prepare
and apply_handle_prepare, if possible try to have a common function to
avoid fixing at both places.
+ /*
+ * 2. Mark the transaction as prepared. - Similar code as for
+ * apply_handle_prepare (i.e. two-phase non-streamed prepare)
+ */
+
+ /*
+ * BeginTransactionBlock is necessary to balance the EndTransactionBlock
+ * called within the PrepareTransactionBlock below.
+ */
+ BeginTransactionBlock();
+ CommitTransactionCommand();
+
+ /*
+ * Update origin state so we can restart streaming from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = prepare_data.end_lsn;
+ replorigin_session_origin_timestamp = prepare_data.prepare_time;
+
+ PrepareTransactionBlock(gid);
+ CommitTransactionCommand();
+
+ pgstat_report_stat(false);
+
+ store_flush_position(prepare_data.end_lsn);
7) two-phase commit is slightly misleading, we can just mention
streaming prepare.
+ * PREPARE callback (for streaming two-phase commit).
+ *
+ * Notify the downstream to prepare the transaction.
+ */
+static void
+pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
8) should we include Assert of in_streaming similar to other
pgoutput_stream*** functions.
+static void
+pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ Assert(rbtxn_is_streamed(txn));
+
+ OutputPluginUpdateProgress(ctx);
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
+ OutputPluginWrite(ctx, true);
+}
9) Here also, we can verify that the transaction is streamed by
checking the pg_stat_replication_slots.
+# check that transaction is committed on subscriber
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*),
count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'Rows inserted by 2PC have committed
on subscriber, and extra columns contain local defaults');
+$result = $node_subscriber->safe_psql('postgres', "SELECT count(*)
FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is committed on subscriber');
Regards,
Vignesh
From | Date | Subject | |
---|---|---|---|
Next Message | Aleksander Alekseev | 2021-04-27 09:33:25 | Re: Bug fix for tab completion of ALTER TABLE ... VALIDATE CONSTRAINT ... |
Previous Message | Michael Paquier | 2021-04-27 07:48:07 | Re: [PATCH] Re: pg_identify_object_as_address() doesn't support pg_event_trigger oids |