From: | "shiy(dot)fnst(at)fujitsu(dot)com" <shiy(dot)fnst(at)fujitsu(dot)com> |
---|---|
To: | Ajin Cherian <itsajin(at)gmail(dot)com>, Peter Smith <smithpb2250(at)gmail(dot)com> |
Cc: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "osumi(dot)takamichi(at)fujitsu(dot)com" <osumi(dot)takamichi(at)fujitsu(dot)com>, vignesh C <vignesh21(at)gmail(dot)com>, Michael Paquier <michael(at)paquier(dot)xyz>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, Euler Taveira <euler(at)timbira(dot)com(dot)br>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Jeff Janes <jeff(dot)janes(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Craig Ringer <craig(at)2ndquadrant(dot)com> |
Subject: | RE: logical replication empty transactions |
Date: | 2022-03-02 02:00:55 |
Message-ID: | OSZPR01MB631042EED2FF7A6F6079C54EFD039@OSZPR01MB6310.jpnprd01.prod.outlook.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Hi,
Here are some comments on the v21 patch.
1.
+ WalSndKeepalive(false, 0);
Maybe we can use InvalidXLogRecPtr here, instead of 0.
2.
+ pq_sendint64(&output_message, writePtr ? writePtr : sentPtr);
Similarly, should we use XLogRecPtrIsInvalid()?
3.
@@ -1183,6 +1269,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Assert(false);
}
+ if (in_streaming)
+ {
+ /* If streaming, send STREAM START if we haven't yet */
+ if (txndata && !txndata->sent_stream_start)
+ pgoutput_send_stream_start(ctx, txn);
+ }
+ else
+ {
+ /* If not streaming, send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+ }
+
+
/* Avoid leaking memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
I am not sure if it is suitable to send begin or stream_start here, because the
row filter is not checked yet. That means, empty transactions caused by row
filter are not skipped.
4.
@@ -1617,9 +1829,21 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
+ PGOutputTxnData *txndata = txn->output_plugin_private;
+ bool sent_begin_txn = txndata->sent_begin_txn;
+
Assert(rbtxn_is_streamed(txn));
- OutputPluginUpdateProgress(ctx);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare");
+ return;
+ }
+
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
I notice that the patch skips stream prepared transaction, this would cause an
error on subscriber side when committing this transaction on publisher side, so
I think we'd better not do that.
For example:
(set logical_decoding_work_mem = 64kB, max_prepared_transactions = 10 in
postgresql.conf)
-- publisher
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create publication pub for table test;
-- subscriber
create table test (a int, b text, primary key(a));
create table test2 (a int, b text, primary key(a));
create subscription sub connection 'dbname=postgres port=5432' publication pub with(two_phase=on, streaming=on);
-- publisher
begin;
INSERT INTO test2 SELECT i, md5(i::text) FROM generate_series(1, 1000) s(i);
prepare transaction 't';
commit prepared 't';
The error message in subscriber log:
ERROR: prepared transaction with identifier "pg_gid_16391_722" does not exist
Regards,
Shi yu
From | Date | Subject | |
---|---|---|---|
Next Message | Tom Lane | 2022-03-02 02:05:02 | Re: Condition pushdown: why (=) is pushed down into join, but BETWEEN or >= is not? |
Previous Message | osumi.takamichi@fujitsu.com | 2022-03-02 01:49:01 | RE: logical replication restrictions |