From bb2c2c4ecca51d2675f2e5c4d6ac3490995be2b0 Mon Sep 17 00:00:00 2001
From: Andres Freund <andres@anarazel.de>
Date: Wed, 7 Apr 2021 19:01:03 -0700
Subject: [PATCH 1/2] WIP: Sketch for a fix for
 InvalidateObsoleteReplicationSlots().

---
 src/backend/replication/slot.c | 226 ++++++++++++++++++++-------------
 1 file changed, 139 insertions(+), 87 deletions(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 75a087c2f9d..5864b9b0139 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1153,6 +1153,140 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Helper for InvalidateObsoleteReplicationSlots. Returns whether
+ * ReplicationSlotControlLock was released.
+ */
+static bool
+InvalidateObsoleteReplicationSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
+{
+	int		last_signaled_pid = 0;
+	bool	released_lock = false;
+
+	while (true)
+	{
+		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
+		bool		slot_conflicts;
+		NameData	slotname;
+		int			active_pid = 0;
+
+		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+		CHECK_FOR_INTERRUPTS();
+
+		slot_conflicts = false;
+
+		if (!s->in_use)
+			continue;
+
+		/*
+		 * Check if the slot needs to be invalidated. If it needs to be
+		 * invalidated, and is not currently acquired, acquire it and mark it
+		 * as having been invalidated. We do all of this with the spinlock
+		 * held - otherwise there would be race conditions (e.g. the slot's
+		 * restart_lsn moving ahead, the slot concurrently being dropped after
+		 * we release ReplicationSlotControlLock, ...).
+		 */
+		SpinLockAcquire(&s->mutex);
+
+		restart_lsn = s->data.restart_lsn;
+
+		/* check if slot needs to be invalidated */
+		if (!XLogRecPtrIsInvalid(restart_lsn) && restart_lsn < oldestLSN)
+		{
+			slot_conflicts = true;
+			slotname = s->data.name;
+			active_pid = s->active_pid;
+
+			/* check if we can acquire it */
+			if (active_pid == 0)
+			{
+				MyReplicationSlot = s;
+				s->active_pid = MyProcPid;
+				s->data.invalidated_at = s->data.restart_lsn;
+				s->data.restart_lsn = InvalidXLogRecPtr;
+			}
+		}
+
+		SpinLockRelease(&s->mutex);
+
+		if (!slot_conflicts)
+		{
+			Assert(active_pid == 0);
+
+			break;
+		}
+		else if (active_pid != 0)
+		{
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
+
+			/*
+			 * Signal to terminate the process that owns the slot.
+			 *
+			 * There is the race condition where other process may own
+			 * the slot after the process using it was terminated and before
+			 * this process owns it. To handle this case, we signal again
+			 * if the PID of the owning process is changed than the last.
+			 *
+			 * XXX This logic assumes that the same PID is not reused
+			 * very quickly.
+			 */
+			if (last_signaled_pid != active_pid)
+			{
+				ereport(LOG,
+						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
+								active_pid, NameStr(slotname))));
+
+				(void) kill(active_pid, SIGTERM);
+				last_signaled_pid = active_pid;
+			}
+
+			/*
+			 * Wait until the slot is released.
+			 *
+			 * Will immediately return in the first iteration, so we can
+			 * recheck the condition before sleeping. That addresses the
+			 * otherwise possible race of the slot already having been
+			 * released.
+			 */
+			ConditionVariableTimedSleep(&s->active_cv, 10,
+										WAIT_EVENT_REPLICATION_SLOT_DROP);
+
+			/* re-acquire for next loop iteration */
+			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+		}
+		else
+		{
+			/*
+			 * Don't want to hold ReplicationSlotControlLock across file
+			 * system operations. Now that we (temporarily) acquired the slot,
+			 * that's safe, as long as we afterwards restart the scan from
+			 * scratch.
+			 */
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
+
+			/* Make sure the invalidated state persists across server restart */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+			ReplicationSlotRelease();
+
+			ereport(LOG,
+					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+							NameStr(slotname),
+							LSN_FORMAT_ARGS(restart_lsn))));
+
+			break;
+		}
+
+	}
+
+	Assert(!released_lock == LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+	return released_lock;
+}
+
 /*
  * Mark any slot that points to an LSN older than the given segment
  * as invalid; it requires WAL that's about to be removed.
@@ -1171,99 +1305,17 @@ restart:
 	for (int i = 0; i < max_replication_slots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
-		NameData	slotname;
-		int		wspid;
-		int		last_signaled_pid = 0;
+
+		CHECK_FOR_INTERRUPTS();
 
 		if (!s->in_use)
 			continue;
 
-		SpinLockAcquire(&s->mutex);
-		slotname = s->data.name;
-		restart_lsn = s->data.restart_lsn;
-		SpinLockRelease(&s->mutex);
-
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
-			continue;
-		LWLockRelease(ReplicationSlotControlLock);
-		CHECK_FOR_INTERRUPTS();
-
-		/* Get ready to sleep on the slot in case it is active */
-		ConditionVariablePrepareToSleep(&s->active_cv);
-
-		for (;;)
+		if (InvalidateObsoleteReplicationSlot(s, oldestLSN))
 		{
-			/*
-			 * Try to mark this slot as used by this process.
-			 *
-			 * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
-			 * should not cancel the prepared condition variable
-			 * if this slot is active in other process. Because in this case
-			 * we have to wait on that CV for the process owning
-			 * the slot to be terminated, later.
-			 */
-			wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
-
-			/*
-			 * Exit the loop if we successfully acquired the slot or
-			 * the slot was dropped during waiting for the owning process
-			 * to be terminated. For example, the latter case is likely to
-			 * happen when the slot is temporary because it's automatically
-			 * dropped by the termination of the owning process.
-			 */
-			if (wspid <= 0)
-				break;
-
-			/*
-			 * Signal to terminate the process that owns the slot.
-			 *
-			 * There is the race condition where other process may own
-			 * the slot after the process using it was terminated and before
-			 * this process owns it. To handle this case, we signal again
-			 * if the PID of the owning process is changed than the last.
-			 *
-			 * XXX This logic assumes that the same PID is not reused
-			 * very quickly.
-			 */
-			if (last_signaled_pid != wspid)
-			{
-				ereport(LOG,
-						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
-								wspid, NameStr(slotname))));
-				(void) kill(wspid, SIGTERM);
-				last_signaled_pid = wspid;
-			}
-
-			ConditionVariableTimedSleep(&s->active_cv, 10,
-										WAIT_EVENT_REPLICATION_SLOT_DROP);
-		}
-		ConditionVariableCancelSleep();
-
-		/*
-		 * Do nothing here and start from scratch if the slot has
-		 * already been dropped.
-		 */
-		if (wspid == -1)
+			/* if the lock was released, we need to restart from scratch */
 			goto restart;
-
-		ereport(LOG,
-				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
-						NameStr(slotname),
-						LSN_FORMAT_ARGS(restart_lsn))));
-
-		SpinLockAcquire(&s->mutex);
-		s->data.invalidated_at = s->data.restart_lsn;
-		s->data.restart_lsn = InvalidXLogRecPtr;
-		SpinLockRelease(&s->mutex);
-
-		/* Make sure the invalidated state persists across server restart */
-		ReplicationSlotMarkDirty();
-		ReplicationSlotSave();
-		ReplicationSlotRelease();
-
-		/* if we did anything, start from scratch */
-		goto restart;
+		}
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 }
-- 
2.31.0.121.g9198c13e34

