From 58ae631010562d094e8e54e4607651b9bce81ae2 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot Date: Mon, 3 Apr 2023 16:46:09 +0000 Subject: [PATCH v58 4/6] Fixing Walsender corner case with logical decoding on standby. The problem is that WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up by walreceiver when new WAL has been flushed. Which means that typically walsenders will get woken up at the same time that the startup process will be - which means that by the time the logical walsender checks GetXLogReplayRecPtr() it's unlikely that the startup process already replayed the record and updated XLogCtl->lastReplayedEndRecPtr. Introducing a new replication_kind variable to the WalSnd struct and moved the call to WalSndWakeup() in ApplyWalRecord(). The new replication_kind variable helps to filter what kind of walsender we want to wakeup based on the code path. --- src/backend/access/transam/xlog.c | 6 +++--- src/backend/access/transam/xlogarchive.c | 2 +- src/backend/access/transam/xlogrecovery.c | 10 ++++----- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 24 +++++++++++++++++++-- src/include/replication/walsender.h | 20 ++++++++--------- src/include/replication/walsender_private.h | 3 +++ 7 files changed, 44 insertions(+), 23 deletions(-) 25.0% src/backend/access/transam/ 45.0% src/backend/replication/ 29.9% src/include/replication/ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 779f5c3711..70ac8fc33b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record) END_CRIT_SECTION(); /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); /* * If we still haven't flushed to the request point then we have a @@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void) END_CRIT_SECTION(); /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + WalSndWakeupProcessRequests(true, !RecoveryInProgress()); /* * Great, done. To take some work off the critical path, try to initialize @@ -5773,7 +5773,7 @@ StartupXLOG(void) * If there were cascading standby servers connected to us, nudge any wal * sender processes to notice that we've been promoted. */ - WalSndWakeup(); + WalSndWakeup(true, true); /* * If this was a promotion, request an (online) checkpoint now. This isn't diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c index a0f5aa24b5..d06fdc74c0 100644 --- a/src/backend/access/transam/xlogarchive.c +++ b/src/backend/access/transam/xlogarchive.c @@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname) * if we restored something other than a WAL segment, but it does no harm * either. */ - WalSndWakeup(); + WalSndWakeup(true, true); } /* diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index dbe9394762..18551cc3b3 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -1935,6 +1935,10 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl XLogRecoveryCtl->lastReplayedTLI = *replayTLI; SpinLockRelease(&XLogRecoveryCtl->info_lck); + /* Wakeup walsender(s) */ + WalSndWakeup(switchedTLI && AllowCascadeReplication(), + switchedTLI || RecoveryInProgress()); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake up the * receiver so that it notices the updated lastReplayedEndRecPtr and sends @@ -1958,12 +1962,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl */ RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI); - /* - * Wake up any walsenders to notice that we are on a new timeline. - */ - if (AllowCascadeReplication()) - WalSndWakeup(); - /* Reset the prefetcher. */ XLogPrefetchReconfigure(); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 685af51d5d..d2aa93734c 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); if (AllowCascadeReplication()) - WalSndWakeup(); + WalSndWakeup(true, !RecoveryInProgress()); /* Report XLOG streaming progress in PS display */ if (update_process_title) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index aeb5f93514..9b68e87ad8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2626,6 +2626,23 @@ InitWalSenderSlot(void) walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; + + /* + * The kind assignment is done here and not in StartReplication() + * and StartLogicalReplication(). Indeed, the logical walsender + * needs to read WAL records (like snapshot of running + * transactions) during the slot creation. So it needs to be woken + * up based on its kind. + * + * The kind assignment could also be done in StartReplication(), + * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it + * seems better to set it on one place. + */ + if (MyDatabaseId == InvalidOid) + walsnd->kind = REPLICATION_KIND_PHYSICAL; + else + walsnd->kind = REPLICATION_KIND_LOGICAL; + SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3314,13 +3331,14 @@ WalSndShmemInit(void) * advisable. */ void -WalSndWakeup(void) +WalSndWakeup(bool physical, bool logical) { int i; for (i = 0; i < max_wal_senders; i++) { Latch *latch; + ReplicationKind kind; WalSnd *walsnd = &WalSndCtl->walsnds[i]; /* @@ -3329,9 +3347,11 @@ WalSndWakeup(void) */ SpinLockAcquire(&walsnd->mutex); latch = walsnd->latch; + kind = walsnd->kind; SpinLockRelease(&walsnd->mutex); - if (latch != NULL) + if (latch != NULL && ((physical && kind == REPLICATION_KIND_PHYSICAL) || + (logical && kind == REPLICATION_KIND_LOGICAL))) SetLatch(latch); } } diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 52bb3e2aae..c6e4515201 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); extern void WalSndShmemInit(void); -extern void WalSndWakeup(void); +extern void WalSndWakeup(bool physical, bool logical); extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); @@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void); /* * wakeup walsenders if there is work to be done */ -#define WalSndWakeupProcessRequests() \ - do \ - { \ - if (wake_wal_senders) \ - { \ - wake_wal_senders = false; \ - if (max_wal_senders > 0) \ - WalSndWakeup(); \ - } \ +#define WalSndWakeupProcessRequests(physical, logical) \ + do \ + { \ + if (wake_wal_senders) \ + { \ + wake_wal_senders = false; \ + if (max_wal_senders > 0) \ + WalSndWakeup(physical, logical); \ + } \ } while (0) #endif /* _WALSENDER_H */ diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 5310e054c4..ff25aa70a8 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -15,6 +15,7 @@ #include "access/xlog.h" #include "lib/ilist.h" #include "nodes/nodes.h" +#include "nodes/replnodes.h" #include "replication/syncrep.h" #include "storage/latch.h" #include "storage/shmem.h" @@ -79,6 +80,8 @@ typedef struct WalSnd * Timestamp of the last message received from standby. */ TimestampTz replyTime; + + ReplicationKind kind; } WalSnd; extern PGDLLIMPORT WalSnd *MyWalSnd; -- 2.34.1