RE: logical replication empty transactions

From: "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com>
To: Ajin Cherian <itsajin(at)gmail(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "osumi(dot)takamichi(at)fujitsu(dot)com" <osumi(dot)takamichi(at)fujitsu(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, Euler Taveira <euler(at)timbira(dot)com(dot)br>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Jeff Janes <jeff(dot)janes(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Craig Ringer <craig(at)2ndquadrant(dot)com>
Subject: RE: logical replication empty transactions
Date: 2022-03-02 02:00:55
Message-ID: OSZPR01MB631042EED2FF7A6F6079C54EFD039@OSZPR01MB6310.jpnprd01.prod.outlook.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

Here are some comments on the v21 patch.

1.
+ WalSndKeepalive(false, 0);

Maybe we can use InvalidXLogRecPtr here, instead of 0.

2.
+ pq_sendint64(&output_message, writePtr ? writePtr : sentPtr);

Similarly, should we use XLogRecPtrIsInvalid()?

3.
@@ -1183,6 +1269,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Assert(false);
}

+ if (in_streaming)
+ {
+ /* If streaming, send STREAM START if we haven't yet */
+ if (txndata && !txndata->sent_stream_start)
+ pgoutput_send_stream_start(ctx, txn);
+ }
+ else
+ {
+ /* If not streaming, send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+ }
+
+
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);

I am not sure if it is suitable to send begin or stream_start here, because the
row filter is not checked yet. That means, empty transactions caused by row
filter are not skipped.

4.
@@ -1617,9 +1829,21 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
+ PGOutputTxnData *txndata = txn->output_plugin_private;
+ bool sent_begin_txn = txndata->sent_begin_txn;
+
Assert(rbtxn_is_streamed(txn));

- OutputPluginUpdateProgress(ctx);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare");
+ return;
+ }
+
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);

I notice that the patch skips stream prepared transaction, this would cause an
error on subscriber side when committing this transaction on publisher side, so
I think we'd better not do that.

For example:
(set logical_decoding_work_mem = 64kB, max_prepared_transactions = 10 in
postgresql.conf)

-- publisher
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create publication pub for table test;

-- subscriber
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create subscription sub connection 'dbname=postgres port=5432' publication pub with(two_phase=on, streaming=on);

-- publisher
begin;
INSERT INTO test2 SELECT i, md5(i::text) FROM generate_series(1, 1000) s(i);
prepare transaction 't';
commit prepared 't';

The error message in subscriber log:
ERROR: prepared transaction with identifier "pg_gid_16391_722" does not exist

Regards,
Shi yu

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tom Lane 2022-03-02 02:05:02 Re: Condition pushdown: why (=) is pushed down into join, but BETWEEN or >= is not?
Previous Message osumi.takamichi@fujitsu.com 2022-03-02 01:49:01 RE: logical replication restrictions