From eb8bb9eda4c1ea5fe2abe87df66ef2b86cdb2441 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Date: Thu, 11 Jan 2024 13:57:53 +0000
Subject: [PATCH v3] Fix race condition in InvalidatePossiblyObsoleteSlot()

In case of an active slot we proceed in 2 steps:
- terminate the backend holding the slot
- report the slot as obsolete

This is racy because between the two we release the mutex on the slot, which
means the effective_xmin and effective_catalog_xmin could advance during that time.

Holding the mutex longer is not an option (given what we'd to do while holding it)
so record the previous LSNs instead that were used during the backend termination.
---
 src/backend/replication/slot.c | 39 ++++++++++++++++++++++++++++------
 1 file changed, 33 insertions(+), 6 deletions(-)
 100.0% src/backend/replication/

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2180a38063..33f76c4acc 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1454,6 +1454,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 {
 	int			last_signaled_pid = 0;
 	bool		released_lock = false;
+	bool		terminated = false;
+	XLogRecPtr	initial_effective_xmin = InvalidXLogRecPtr;
+	XLogRecPtr	initial_catalog_effective_xmin = InvalidXLogRecPtr;
+	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
+	ReplicationSlotInvalidationCause conflict_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
 
 	for (;;)
 	{
@@ -1488,11 +1493,24 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		 */
 		if (s->data.invalidated == RS_INVAL_NONE)
 		{
+			/*
+			 * We'll release the slot's mutex soon, so it's possible that
+			 * those values change since the process holding the slot has been
+			 * terminated (if any), so record them here to ensure we would
+			 * report the slot as obsolete correctly.
+			 */
+			if (!terminated)
+			{
+				initial_restart_lsn = s->data.restart_lsn;
+				initial_effective_xmin = s->effective_xmin;
+				initial_catalog_effective_xmin = s->effective_catalog_xmin;
+			}
+
 			switch (cause)
 			{
 				case RS_INVAL_WAL_REMOVED:
-					if (s->data.restart_lsn != InvalidXLogRecPtr &&
-						s->data.restart_lsn < oldestLSN)
+					if (initial_restart_lsn != InvalidXLogRecPtr &&
+						initial_restart_lsn < oldestLSN)
 						conflict = cause;
 					break;
 				case RS_INVAL_HORIZON:
@@ -1501,12 +1519,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					/* invalid DB oid signals a shared relation */
 					if (dboid != InvalidOid && dboid != s->data.database)
 						break;
-					if (TransactionIdIsValid(s->effective_xmin) &&
-						TransactionIdPrecedesOrEquals(s->effective_xmin,
+					if (TransactionIdIsValid(initial_effective_xmin) &&
+						TransactionIdPrecedesOrEquals(initial_effective_xmin,
 													  snapshotConflictHorizon))
 						conflict = cause;
-					else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
-							 TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
+					else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
+							 TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
 														   snapshotConflictHorizon))
 						conflict = cause;
 					break;
@@ -1519,6 +1537,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			}
 		}
 
+		/*
+		 * Assert that the conflict cause recorded previously before we
+		 * terminate the process did not change now for any reason.
+		 */
+		Assert(!(conflict_prev != RS_INVAL_NONE && terminated &&
+				 conflict_prev != conflict));
+
 		/* if there's no conflict, we're done */
 		if (conflict == RS_INVAL_NONE)
 		{
@@ -1601,6 +1626,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					(void) kill(active_pid, SIGTERM);
 
 				last_signaled_pid = active_pid;
+				terminated = true;
+				conflict_prev = conflict;
 			}
 
 			/* Wait until the slot is released. */
-- 
2.34.1

