From f296a5b880a5688656d308300d1015ba0562c1b7 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvherre@alvh.no-ip.org>
Date: Thu, 10 Jun 2021 16:43:43 -0400
Subject: [PATCH 1/2] the code fix

---
 src/backend/replication/slot.c | 234 +++++++++++++++++++++------------
 1 file changed, 147 insertions(+), 87 deletions(-)

diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index c88b803e5d..c36b16d070 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1160,6 +1160,150 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
+ * and mark it invalid, if necessary and possible.
+ *
+ * Returns whether ReplicationSlotControlLock was released (and in that case,
+ * we're not holding the lock at return; otherwise we are).
+ *
+ * This is inherently racy, because we want to release the
+ * LWLock for syscalls, so caller must restart if we return true.
+ */
+static bool
+InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
+{
+	int		last_signaled_pid = 0;
+	bool	released_lock = false;
+
+	for (;;)
+	{
+		XLogRecPtr	restart_lsn;
+		NameData	slotname;
+		int			active_pid = 0;
+
+		Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
+
+		if (!s->in_use)
+		{
+			if (released_lock)
+				LWLockRelease(ReplicationSlotControlLock);
+			break;
+		}
+
+		/*
+		 * Check if the slot needs to be invalidated. If it needs to be
+		 * invalidated, and is not currently acquired, acquire it and mark it
+		 * as having been invalidated.  We do this with the spinlock held to
+		 * avoid race conditions -- for example the restart_lsn could move
+		 * forward, or the slot could be dropped.
+		 */
+		SpinLockAcquire(&s->mutex);
+
+		restart_lsn = s->data.restart_lsn;
+
+		/*
+		 * If the slot is already invalid or is fresh enough, we don't need to
+		 * do anything.
+		 */
+		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+		{
+			SpinLockRelease(&s->mutex);
+			if (released_lock)
+				LWLockRelease(ReplicationSlotControlLock);
+			break;
+		}
+
+		slotname = s->data.name;
+		active_pid = s->active_pid;
+
+		/*
+		 * If the slot can be acquired, do so and mark it invalidated
+		 * immediately.  Otherwise we'll signal the owning process, below, and
+		 * retry.
+		 */
+		if (active_pid == 0)
+		{
+			MyReplicationSlot = s;
+			s->active_pid = MyProcPid;
+			s->data.invalidated_at = restart_lsn;
+			s->data.restart_lsn = InvalidXLogRecPtr;
+		}
+
+		SpinLockRelease(&s->mutex);
+
+		if (active_pid != 0)
+		{
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
+
+			/*
+			 * Signal to terminate the process that owns the slot.
+			 *
+			 * There is the race condition that other process may own the slot
+			 * after its current owner process is terminated and before this
+			 * process owns it. To handle that, we signal only if the PID of
+			 * the owning process has changed from the previous time. (This
+			 * logic assumes that the same PID is not reused very quickly.)
+			 */
+			if (last_signaled_pid != active_pid)
+			{
+				ereport(LOG,
+						(errmsg("terminating process %d to release replication slot \"%s\"",
+								active_pid, NameStr(slotname))));
+
+				(void) kill(active_pid, SIGTERM);
+				last_signaled_pid = active_pid;
+			}
+
+			/*
+			 * Wait until the slot is released.
+			 *
+			 * Will immediately return in the first iteration, so we can
+			 * recheck the condition before sleeping. That addresses the
+			 * otherwise possible race of the slot already having been
+			 * released.
+			 */
+			ConditionVariableSleep(&s->active_cv,
+								   WAIT_EVENT_REPLICATION_SLOT_DROP);
+
+			/* re-acquire lock and try again */
+			LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+			continue;
+		}
+		else
+		{
+			/*
+			 * We hold the slot now and have already invalidated it; flush it
+			 * to ensure that state persists.
+			 *
+			 * Don't want to hold ReplicationSlotControlLock across file
+			 * system operations, so release it now but be sure to tell caller
+			 * to restart from scratch.
+			 */
+			LWLockRelease(ReplicationSlotControlLock);
+			released_lock = true;
+
+			/* Make sure the invalidated state persists across server restart */
+			ReplicationSlotMarkDirty();
+			ReplicationSlotSave();
+			ReplicationSlotRelease();
+
+			ereport(LOG,
+					(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
+							NameStr(slotname),
+							LSN_FORMAT_ARGS(restart_lsn))));
+
+			/* done with this slot for now */
+			break;
+		}
+	}
+
+	Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
+
+	return released_lock;
+}
+
 /*
  * Mark any slot that points to an LSN older than the given segment
  * as invalid; it requires WAL that's about to be removed.
@@ -1178,99 +1322,15 @@ restart:
 	for (int i = 0; i < max_replication_slots; i++)
 	{
 		ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
-		XLogRecPtr	restart_lsn = InvalidXLogRecPtr;
-		NameData	slotname;
-		int			wspid;
-		int			last_signaled_pid = 0;
 
 		if (!s->in_use)
 			continue;
 
-		SpinLockAcquire(&s->mutex);
-		slotname = s->data.name;
-		restart_lsn = s->data.restart_lsn;
-		SpinLockRelease(&s->mutex);
-
-		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
-			continue;
-		LWLockRelease(ReplicationSlotControlLock);
-		CHECK_FOR_INTERRUPTS();
-
-		/* Get ready to sleep on the slot in case it is active */
-		ConditionVariablePrepareToSleep(&s->active_cv);
-
-		for (;;)
+		if (InvalidatePossiblyObsoleteSlot(s, oldestLSN))
 		{
-			/*
-			 * Try to mark this slot as used by this process.
-			 *
-			 * Note that ReplicationSlotAcquireInternal(SAB_Inquire) should
-			 * not cancel the prepared condition variable if this slot is
-			 * active in other process. Because in this case we have to wait
-			 * on that CV for the process owning the slot to be terminated,
-			 * later.
-			 */
-			wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
-
-			/*
-			 * Exit the loop if we successfully acquired the slot or the slot
-			 * was dropped during waiting for the owning process to be
-			 * terminated. For example, the latter case is likely to happen
-			 * when the slot is temporary because it's automatically dropped
-			 * by the termination of the owning process.
-			 */
-			if (wspid <= 0)
-				break;
-
-			/*
-			 * Signal to terminate the process that owns the slot.
-			 *
-			 * There is the race condition where other process may own the
-			 * slot after the process using it was terminated and before this
-			 * process owns it. To handle this case, we signal again if the
-			 * PID of the owning process is changed than the last.
-			 *
-			 * XXX This logic assumes that the same PID is not reused very
-			 * quickly.
-			 */
-			if (last_signaled_pid != wspid)
-			{
-				ereport(LOG,
-						(errmsg("terminating process %d because replication slot \"%s\" is too far behind",
-								wspid, NameStr(slotname))));
-				(void) kill(wspid, SIGTERM);
-				last_signaled_pid = wspid;
-			}
-
-			ConditionVariableTimedSleep(&s->active_cv, 10,
-										WAIT_EVENT_REPLICATION_SLOT_DROP);
-		}
-		ConditionVariableCancelSleep();
-
-		/*
-		 * Do nothing here and start from scratch if the slot has already been
-		 * dropped.
-		 */
-		if (wspid == -1)
+			/* if the lock was released, start from scratch */
 			goto restart;
-
-		ereport(LOG,
-				(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
-						NameStr(slotname),
-						LSN_FORMAT_ARGS(restart_lsn))));
-
-		SpinLockAcquire(&s->mutex);
-		s->data.invalidated_at = s->data.restart_lsn;
-		s->data.restart_lsn = InvalidXLogRecPtr;
-		SpinLockRelease(&s->mutex);
-
-		/* Make sure the invalidated state persists across server restart */
-		ReplicationSlotMarkDirty();
-		ReplicationSlotSave();
-		ReplicationSlotRelease();
-
-		/* if we did anything, start from scratch */
-		goto restart;
+		}
 	}
 	LWLockRelease(ReplicationSlotControlLock);
 }
-- 
2.20.1

