From 95cdd939b709a2a1f1462b6a92c6cc201a3adde7 Mon Sep 17 00:00:00 2001 From: bdrouvotAWS Date: Tue, 26 Oct 2021 14:35:23 +0000 Subject: [PATCH v25 6/6] Fixing Walsender corner cases with logical decoding on standby. The problem is that WalSndWaitForWal() waits for the *replay* LSN to increase, but gets woken up by walreceiver when new WAL has been flushed. Which means that typically walsenders will get woken up at the same time that the startup process will be - which means that by the time the logical walsender checks GetXLogReplayRecPtr() it's unlikely that the startup process already replayed the record and updated XLogCtl->lastReplayedEndRecPtr. Fixed by making used of a condition variable on the replay position. --- src/backend/access/transam/xlog.c | 20 +++++++++++++---- src/backend/replication/walsender.c | 29 ++++++++++++++++++------- src/backend/utils/activity/wait_event.c | 3 +++ src/include/replication/walsender.h | 12 ++++++++++ src/include/utils/wait_event.h | 1 + 5 files changed, 53 insertions(+), 12 deletions(-) 29.6% src/backend/access/transam/ 52.7% src/backend/replication/ 4.1% src/backend/utils/activity/ 11.7% src/include/replication/ diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1e9f94927c..3d125228fe 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -737,6 +737,7 @@ typedef struct XLogCtlData } XLogCtlData; static XLogCtlData *XLogCtl = NULL; +XLogCtlCvData *XLogCtlCv = NULL; /* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */ static WALInsertLockPadded *WALInsertLocks = NULL; @@ -5182,7 +5183,8 @@ void XLOGShmemInit(void) { bool foundCFile, - foundXLog; + foundXLog, + foundXLogCv; char *allocptr; int i; ControlFileData *localControlFile; @@ -5207,14 +5209,17 @@ XLOGShmemInit(void) XLogCtl = (XLogCtlData *) ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog); + XLogCtlCv = (XLogCtlCvData *) + ShmemInitStruct("XLOG Cv Ctl", sizeof(XLogCtlCvData), &foundXLogCv); + localControlFile = ControlFile; ControlFile = (ControlFileData *) ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile); - if (foundCFile || foundXLog) + if (foundCFile || foundXLog || foundXLogCv) { - /* both should be present or neither */ - Assert(foundCFile && foundXLog); + /* All should be present or neither */ + Assert(foundCFile && foundXLog && foundXLogCv); /* Initialize local copy of WALInsertLocks */ WALInsertLocks = XLogCtl->Insert.WALInsertLocks; @@ -5224,6 +5229,7 @@ XLOGShmemInit(void) return; } memset(XLogCtl, 0, sizeof(XLogCtlData)); + memset(XLogCtlCv, 0, sizeof(XLogCtlCvData)); /* * Already have read control file locally, unless in bootstrap mode. Move @@ -5285,6 +5291,7 @@ XLOGShmemInit(void) SpinLockInit(&XLogCtl->ulsn_lck); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); ConditionVariableInit(&XLogCtl->recoveryNotPausedCV); + ConditionVariableInit(&XLogCtlCv->replayedCV); } /* @@ -7684,6 +7691,11 @@ StartupXLOG(void) XLogCtl->lastReplayedTLI = ThisTimeLineID; SpinLockRelease(&XLogCtl->info_lck); + /* + * wake up walsender(s) used by logical decoding on standby. + */ + ConditionVariableBroadcast(&XLogCtlCv->replayedCV); + /* * If rm_redo called XLogRequestWalReceiverReply, then we wake * up the receiver so that it notices the updated diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2ea4d27b63..8460413030 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1493,6 +1493,7 @@ WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + XLogCtlCvData *xlogctlcv = XLogCtlCv; /* * Fast path to avoid acquiring the spinlock in case we already know we @@ -1511,7 +1512,6 @@ WalSndWaitForWal(XLogRecPtr loc) for (;;) { - long sleeptime; /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1595,20 +1595,33 @@ WalSndWaitForWal(XLogRecPtr loc) WalSndKeepaliveIfNecessary(); /* - * Sleep until something happens or we time out. Also wait for the - * socket becoming writable, if there's still pending output. + * When not in recovery, sleep until something happens or we time out. + * Also wait for the socket becoming writable, if there's still pending output. * Otherwise we might sit on sendable output data while waiting for * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ - sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + if (!RecoveryInProgress()) + { + long sleeptime; + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - wakeEvents = WL_SOCKET_READABLE; + wakeEvents = WL_SOCKET_READABLE; - if (pq_is_send_pending()) - wakeEvents |= WL_SOCKET_WRITEABLE; + if (pq_is_send_pending()) + wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + WalSndWait(wakeEvents, sleeptime * 10, WAIT_EVENT_WAL_SENDER_WAIT_WAL); + } + else + /* + * We are in the logical decoding on standby case. + * We are waiting for the startup process to replay wal record(s). + */ + { + ConditionVariablePrepareToSleep(&xlogctlcv->replayedCV); + ConditionVariableSleep(&xlogctlcv->replayedCV, WAIT_EVENT_WAL_SENDER_WAIT_REPLAY); + } } /* reactivate latch so WalSndLoop knows to continue */ diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c index 4a5b7502f5..92dc17baf1 100644 --- a/src/backend/utils/activity/wait_event.c +++ b/src/backend/utils/activity/wait_event.c @@ -448,6 +448,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_WAL_RECEIVER_WAIT_START: event_name = "WalReceiverWaitStart"; break; + case WAIT_EVENT_WAL_SENDER_WAIT_REPLAY: + event_name = "WalReceiverWaitReplay"; + break; case WAIT_EVENT_XACT_GROUP_UPDATE: event_name = "XactGroupUpdate"; break; diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 828106933c..7a2c04c937 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -13,6 +13,7 @@ #define _WALSENDER_H #include +#include "storage/condition_variable.h" /* * What to do with a snapshot in create replication slot command. @@ -48,6 +49,17 @@ extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +/* + * shared-memory state for Condition Variable(s) + * between the startup process and the walsender. + */ +typedef struct XLogCtlCvData +{ + ConditionVariable replayedCV; +} XLogCtlCvData; + +extern XLogCtlCvData *XLogCtlCv; + /* * Remember that we want to wakeup walsenders later * diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h index c22142365f..b1a27f5e84 100644 --- a/src/include/utils/wait_event.h +++ b/src/include/utils/wait_event.h @@ -125,6 +125,7 @@ typedef enum WAIT_EVENT_SYNC_REP, WAIT_EVENT_WAL_RECEIVER_EXIT, WAIT_EVENT_WAL_RECEIVER_WAIT_START, + WAIT_EVENT_WAL_SENDER_WAIT_REPLAY, WAIT_EVENT_XACT_GROUP_UPDATE } WaitEventIPC; -- 2.18.4