From 4665945cba163bcb4beadc6391ee65c755e566d8 Mon Sep 17 00:00:00 2001
From: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Date: Thu, 11 Jan 2024 13:57:53 +0000
Subject: [PATCH v2] Fix race condition in InvalidatePossiblyObsoleteSlot()

In case of an active slot we proceed in 2 steps:
- terminate the backend holding the slot
- report the slot as obsolete

This is racy because between the two we release the mutex on the slot, which
means the effective_xmin and effective_catalog_xmin could advance during that time.

Holding the mutex longer is not an option (given what we'd to do while holding it)
so record the previous LSNs instead that were used during the backend termination.
---
 src/backend/replication/slot.c | 37 ++++++++++++++++++++++++++++------
 1 file changed, 31 insertions(+), 6 deletions(-)
 100.0% src/backend/replication/

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 52da694c79..f136916285 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1352,6 +1352,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 {
 	int			last_signaled_pid = 0;
 	bool		released_lock = false;
+	bool		terminated = false;
+	XLogRecPtr	initial_effective_xmin = InvalidXLogRecPtr;
+	XLogRecPtr	initial_catalog_effective_xmin = InvalidXLogRecPtr;
+	XLogRecPtr	initial_restart_lsn = InvalidXLogRecPtr;
+	ReplicationSlotInvalidationCause conflict_prev = RS_INVAL_NONE;
 
 	for (;;)
 	{
@@ -1386,11 +1391,18 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		 */
 		if (s->data.invalidated == RS_INVAL_NONE)
 		{
+			if (!terminated)
+			{
+				initial_restart_lsn = s->data.restart_lsn;
+				initial_effective_xmin = s->effective_xmin;
+				initial_catalog_effective_xmin = s->effective_catalog_xmin;
+			}
+
 			switch (cause)
 			{
 				case RS_INVAL_WAL_REMOVED:
-					if (s->data.restart_lsn != InvalidXLogRecPtr &&
-						s->data.restart_lsn < oldestLSN)
+					if (initial_restart_lsn != InvalidXLogRecPtr &&
+						initial_restart_lsn < oldestLSN)
 						conflict = cause;
 					break;
 				case RS_INVAL_HORIZON:
@@ -1399,12 +1411,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					/* invalid DB oid signals a shared relation */
 					if (dboid != InvalidOid && dboid != s->data.database)
 						break;
-					if (TransactionIdIsValid(s->effective_xmin) &&
-						TransactionIdPrecedesOrEquals(s->effective_xmin,
+					if (TransactionIdIsValid(initial_effective_xmin) &&
+						TransactionIdPrecedesOrEquals(initial_effective_xmin,
 													  snapshotConflictHorizon))
 						conflict = cause;
-					else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
-							 TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
+					else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
+							 TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
 														   snapshotConflictHorizon))
 						conflict = cause;
 					break;
@@ -1417,6 +1429,17 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 			}
 		}
 
+		/*
+		 * Check if the conflict cause recorded previously before we terminate
+		 * the process changed now for any reason.
+		 */
+		if (conflict_prev != RS_INVAL_NONE && terminated &&
+			conflict_prev != conflict)
+			elog(PANIC, "conflict cause recorded before terminating process %d has been changed; previous cause: %d, current cause: %d",
+				 last_signaled_pid,
+				 conflict_prev,
+				 conflict);
+
 		/* if there's no conflict, we're done */
 		if (conflict == RS_INVAL_NONE)
 		{
@@ -1499,6 +1522,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 					(void) kill(active_pid, SIGTERM);
 
 				last_signaled_pid = active_pid;
+				terminated = true;
+				conflict_prev = conflict;
 			}
 
 			/* Wait until the slot is released. */
-- 
2.34.1

