From: | "kuroda(dot)hayato(at)fujitsu(dot)com" <kuroda(dot)hayato(at)fujitsu(dot)com> |
---|---|
To: | "wangw(dot)fnst(at)fujitsu(dot)com" <wangw(dot)fnst(at)fujitsu(dot)com> |
Cc: | "tanghy(dot)fnst(at)fujitsu(dot)com" <tanghy(dot)fnst(at)fujitsu(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Simon Riggs <simon(dot)riggs(at)enterprisedb(dot)com>, Petr Jelinek <petr(dot)jelinek(at)enterprisedb(dot)com>, Fabrice Chapuis <fabrice636861(at)gmail(dot)com> |
Subject: | RE: Logical replication timeout problem |
Date: | 2022-02-08 09:18:28 |
Message-ID: | TYAPR01MB5866BD2248EF82FF432FE599F52D9@TYAPR01MB5866.jpnprd01.prod.outlook.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Dear Wang,
Thank you for making a patch.
I applied your patch and confirmed that codes passed regression test.
I put a short reviewing:
```
+ static int skipped_changes_count = 0;
+ /*
+ * Conservatively, at least 150,000 changes can be skipped in 1s.
+ *
+ * Because we use half of wal_sender_timeout as the threshold, and the unit
+ * of wal_sender_timeout in process is ms, the final threshold is
+ * wal_sender_timeout * 75.
+ */
+ int skipped_changes_threshold = wal_sender_timeout * 75;
```
I'm not sure but could you tell me the background of this calculation?
Is this assumption reasonable?
```
@@ -654,20 +663,62 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
{
case REORDER_BUFFER_CHANGE_INSERT:
if (!relentry->pubactions.pubinsert)
+ {
+ if (++skipped_changes_count >= skipped_changes_threshold)
+ {
+ OutputPluginUpdateProgress(ctx, true);
+
+ /*
+ * After sending keepalive message, reset
+ * skipped_changes_count.
+ */
+ skipped_changes_count = 0;
+ }
return;
+ }
break;
```
Is the if-statement needed? In the walsender case OutputPluginUpdateProgress() leads WalSndUpdateProgress(),
and the function also has the threshold for ping-ing.
```
static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool send_keep_alive)
{
- static TimestampTz sendTime = 0;
+ static TimestampTz trackTime = 0;
TimestampTz now = GetCurrentTimestamp();
+ if (send_keep_alive)
+ {
+ /*
+ * If half of wal_sender_timeout has lapsed without send message standby,
+ * send a keep-alive message to the standby.
+ */
+ static TimestampTz sendTime = 0;
+ TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime,
+ wal_sender_timeout / 2);
+ if (now >= ping_time)
+ {
+ WalSndKeepalive(false);
+
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ WalSndShutdown();
+ sendTime = now;
+ }
+ }
+
```
* +1 about renaming to trackTime.
* `/2` might be magic number. How about following? Renaming is very welcome:
```
+#define WALSND_LOGICAL_PING_FACTOR 0.5
+ static TimestampTz sendTime = 0;
+ TimestampTz ping_time = TimestampTzPlusMilliseconds(sendTime,
+ wal_sender_timeout * WALSND_LOGICAL_PING_FACTOR)
```
Could you add a commitfest entry for cfbot?
Best Regards,
Hayato Kuroda
FUJITSU LIMITED
From | Date | Subject | |
---|---|---|---|
Next Message | Ken Kato | 2022-02-08 09:43:18 | Re: [PATCH] Add min() and max() aggregate functions for xid8 |
Previous Message | Bharath Rupireddy | 2022-02-08 09:03:01 | Re: Add checkpoint and redo LSN to LogCheckpointEnd log message |