diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8dbcace3f9..786e3ee1ca 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -255,7 +255,7 @@ typedef struct QueueBackendStatus
  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
  * entries of other backends and also change the head pointer. When holding
  * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
- * can change the tail pointer.
+ * can change the tail pointers.
  *
  * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
  * In order to avoid deadlocks, whenever we need multiple locks, we first get
@@ -276,6 +276,8 @@ typedef struct AsyncQueueControl
 	QueuePosition head;			/* head points to the next free location */
 	QueuePosition tail;			/* tail must be <= the queue position of every
 								 * listening backend */
+	int			tailPage;		/* oldest unrecycled page; must be <=
+								 * tail.page */
 	BackendId	firstListener;	/* id of first listener, or InvalidBackendId */
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
@@ -286,6 +288,7 @@ static AsyncQueueControl *asyncQueueControl;
 
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
+#define QUEUE_TAIL_PAGE				(asyncQueueControl->tailPage)
 #define QUEUE_FIRST_LISTENER		(asyncQueueControl->firstListener)
 #define QUEUE_BACKEND_PID(i)		(asyncQueueControl->backend[i].pid)
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
@@ -537,6 +540,7 @@ AsyncShmemInit(void)
 		/* First time through, so initialize it */
 		SET_QUEUE_POS(QUEUE_HEAD, 0, 0);
 		SET_QUEUE_POS(QUEUE_TAIL, 0, 0);
+		QUEUE_TAIL_PAGE = 0;
 		QUEUE_FIRST_LISTENER = InvalidBackendId;
 		asyncQueueControl->lastQueueFillWarn = 0;
 		/* zero'th entry won't be used, but let's initialize it anyway */
@@ -1358,7 +1362,7 @@ asyncQueueIsFull(void)
 	nexthead = QUEUE_POS_PAGE(QUEUE_HEAD) + 1;
 	if (nexthead > QUEUE_MAX_PAGE)
 		nexthead = 0;			/* wrap around */
-	boundary = QUEUE_POS_PAGE(QUEUE_TAIL);
+	boundary = QUEUE_TAIL_PAGE;
 	boundary -= boundary % SLRU_PAGES_PER_SEGMENT;
 	return asyncQueuePagePrecedes(nexthead, boundary);
 }
@@ -1577,7 +1581,7 @@ static double
 asyncQueueUsage(void)
 {
 	int			headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
-	int			tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
+	int			tailPage = QUEUE_TAIL_PAGE;
 	int			occupied;
 
 	occupied = headPage - tailPage;
@@ -2178,7 +2182,23 @@ asyncQueueAdvanceTail(void)
 	/* Restrict task to one backend per cluster; see SimpleLruTruncate(). */
 	LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE);
 
-	/* Compute the new tail. */
+	/*
+	 * Compute the new tail.  Pre-v13, it's essential that QUEUE_TAIL be exact
+	 * (ie, exactly match at least one backend's queue position), so it must
+	 * be updated atomically with the actual computation.  Since v13, we could
+	 * get away with not doing it like that, but it seems prudent to keep it
+	 * so.
+	 *
+	 * Also, because incoming backends will scan forward from QUEUE_TAIL, that
+	 * must be advanced before we can truncate any data.  Thus, QUEUE_TAIL is
+	 * the logical tail, while QUEUE_TAIL_PAGE is the oldest not-yet-truncated
+	 * page.  When QUEUE_TAIL_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL), there are
+	 * pages we can truncate but haven't yet finished doing so.
+	 *
+	 * For concurrency's sake, we don't want to hold NotifyQueueLock while
+	 * performing SimpleLruTruncate.  This is OK because no backend will try
+	 * to access the pages we are in the midst of truncating.
+	 */
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
 	min = QUEUE_HEAD;
 	for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i))
@@ -2186,7 +2206,8 @@ asyncQueueAdvanceTail(void)
 		Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
 		min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
 	}
-	oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL);
+	QUEUE_TAIL = min;
+	oldtailpage = QUEUE_TAIL_PAGE;
 	LWLockRelease(NotifyQueueLock);
 
 	/*
@@ -2205,16 +2226,16 @@ asyncQueueAdvanceTail(void)
 		 * release the lock again.
 		 */
 		SimpleLruTruncate(NotifyCtl, newtailpage);
-	}
 
-	/*
-	 * Advertise the new tail.  This changes asyncQueueIsFull()'s verdict for
-	 * the segment immediately prior to the new tail, allowing fresh data into
-	 * that segment.
-	 */
-	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-	QUEUE_TAIL = min;
-	LWLockRelease(NotifyQueueLock);
+		/*
+		 * Update QUEUE_TAIL_PAGE.  This changes asyncQueueIsFull()'s verdict
+		 * for the segment immediately prior to the new tail, allowing fresh
+		 * data into that segment.
+		 */
+		LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+		QUEUE_TAIL_PAGE = newtailpage;
+		LWLockRelease(NotifyQueueLock);
+	}
 
 	LWLockRelease(NotifyQueueTailLock);
 }
