From ee5c0944effd0634d82d90828c2f749e9abf5fba Mon Sep 17 00:00:00 2001
From: Amit Khandekar <amit.khandekar@enterprisedb.com>
Date: Thu, 16 Jan 2020 10:05:16 +0530
Subject: [PATCH v6 3/5] Handle logical slot conflicts on standby.

During WAL replay on standby, when slot conflict is identified,
drop such slots. Also do the same thing if wal_level on master
is reduced to below logical and there are existing logical slots
on standby. Introduce a new ProcSignalReason value for slot
conflict recovery. Arrange for a new pg_stat_get_activity field:
confl_logicalslot.

Amit Khandekar, reviewed by Andres Freund.
---
 doc/src/sgml/monitoring.sgml         |   6 +
 src/backend/access/gist/gistxlog.c   |   4 +-
 src/backend/access/hash/hash_xlog.c  |   3 +-
 src/backend/access/heap/heapam.c     |  13 +-
 src/backend/access/nbtree/nbtxlog.c  |   4 +-
 src/backend/access/spgist/spgxlog.c  |   1 +
 src/backend/access/transam/xlog.c    |  14 +++
 src/backend/catalog/system_views.sql |   1 +
 src/backend/postmaster/pgstat.c      |   4 +
 src/backend/replication/slot.c       | 176 +++++++++++++++++++++++++++
 src/backend/storage/ipc/procarray.c  |   4 +
 src/backend/storage/ipc/procsignal.c |   3 +
 src/backend/storage/ipc/standby.c    |   4 +-
 src/backend/tcop/postgres.c          |  22 ++++
 src/backend/utils/adt/pgstatfuncs.c  |  16 +++
 src/include/catalog/pg_proc.dat      |   5 +
 src/include/pgstat.h                 |   1 +
 src/include/replication/slot.h       |   2 +
 src/include/storage/procsignal.h     |   1 +
 src/include/storage/standby.h        |   2 +-
 src/test/regress/expected/rules.out  |   1 +
 21 files changed, 278 insertions(+), 9 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 7626987808..aa65478b31 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -2742,6 +2742,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
      <entry>Number of queries in this database that have been canceled due to
       old snapshots</entry>
     </row>
+    <row>
+     <entry><structfield>confl_logicalslot</structfield></entry>
+     <entry><type>bigint</type></entry>
+     <entry>Number of queries in this database that have been canceled due to
+      logical slots</entry>
+    </row>
     <row>
      <entry><structfield>confl_bufferpin</structfield></entry>
      <entry><type>bigint</type></entry>
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 90bc4895b2..289176305a 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -195,7 +195,8 @@ gistRedoDeleteRecord(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+											xldata->onCatalogTable, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
@@ -414,6 +415,7 @@ gistRedoPageReuse(XLogReaderState *record)
 
 			latestRemovedXid = XidFromFullTransactionId(latestRemovedFullXid);
 			ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+												xlrec->onCatalogTable,
 												xlrec->node);
 		}
 	}
diff --git a/src/backend/access/hash/hash_xlog.c b/src/backend/access/hash/hash_xlog.c
index 3c60677662..5767890fa4 100644
--- a/src/backend/access/hash/hash_xlog.c
+++ b/src/backend/access/hash/hash_xlog.c
@@ -1002,7 +1002,8 @@ hash_xlog_vacuum_one_page(XLogReaderState *record)
 		RelFileNode rnode;
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid,
+											xldata->onCatalogTable, rnode);
 	}
 
 	action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &buffer);
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index bf6bfe69dc..440f444dc1 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7703,7 +7703,8 @@ heap_xlog_cleanup_info(XLogReaderState *record)
 	xl_heap_cleanup_info *xlrec = (xl_heap_cleanup_info *) XLogRecGetData(record);
 
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, xlrec->node);
 
 	/*
 	 * Actual operation is a no-op. Record type exists to provide a means for
@@ -7739,7 +7740,8 @@ heap_xlog_clean(XLogReaderState *record)
 	 * latestRemovedXid is invalid, skip conflict processing.
 	 */
 	if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid))
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 
 	/*
 	 * If we have a full-page image, restore it (using a cleanup lock) and
@@ -7835,7 +7837,9 @@ heap_xlog_visible(XLogReaderState *record)
 	 * rather than killing the transaction outright.
 	 */
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid,
+											xlrec->onCatalogTable,
+											rnode);
 
 	/*
 	 * Read the heap page, if it still exists. If the heap file has dropped or
@@ -7972,7 +7976,8 @@ heap_xlog_freeze_page(XLogReaderState *record)
 		TransactionIdRetreat(latestRemovedXid);
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
-		ResolveRecoveryConflictWithSnapshot(latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 	}
 
 	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index 99d0914e72..09734b3e94 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -659,7 +659,8 @@ btree_xlog_delete(XLogReaderState *record)
 
 		XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
 
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable, rnode);
 	}
 
 	/*
@@ -935,6 +936,7 @@ btree_xlog_reuse_page(XLogReaderState *record)
 	if (InHotStandby)
 	{
 		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
+											xlrec->onCatalogTable,
 											xlrec->node);
 	}
 }
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index 7be2291d07..0672f994ec 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -881,6 +881,7 @@ spgRedoVacuumRedirect(XLogReaderState *record)
 
 			XLogRecGetBlockTag(record, 0, &node, NULL, NULL);
 			ResolveRecoveryConflictWithSnapshot(xldata->newestRedirectXid,
+												xldata->onCatalogTable,
 												node);
 		}
 	}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index a666b4b935..c679f2d767 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -9942,6 +9942,20 @@ xlog_redo(XLogReaderState *record)
 		/* Update our copy of the parameters in pg_control */
 		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
 
+		/*
+		 * Drop logical slots if we are in hot standby and the primary does not
+		 * have a WAL level sufficient for logical decoding. No need to search
+		 * for potentially conflicting logically slots if standby is running
+		 * with wal_level lower than logical, because in that case, we would
+		 * have either disallowed creation of logical slots or dropped existing
+		 * ones.
+		 */
+		if (InRecovery && InHotStandby &&
+			xlrec.wal_level < WAL_LEVEL_LOGICAL &&
+			wal_level >= WAL_LEVEL_LOGICAL)
+			ResolveRecoveryConflictWithLogicalSlots(InvalidOid, InvalidTransactionId,
+				gettext_noop("Logical decoding on standby requires wal_level >= logical on master."));
+
 		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
 		ControlFile->MaxConnections = xlrec.MaxConnections;
 		ControlFile->max_worker_processes = xlrec.max_worker_processes;
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index b8a3f46912..471a3e78f9 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -906,6 +906,7 @@ CREATE VIEW pg_stat_database_conflicts AS
             pg_stat_get_db_conflict_tablespace(D.oid) AS confl_tablespace,
             pg_stat_get_db_conflict_lock(D.oid) AS confl_lock,
             pg_stat_get_db_conflict_snapshot(D.oid) AS confl_snapshot,
+            pg_stat_get_db_conflict_logicalslot(D.oid) AS confl_logicalslot,
             pg_stat_get_db_conflict_bufferpin(D.oid) AS confl_bufferpin,
             pg_stat_get_db_conflict_startup_deadlock(D.oid) AS confl_deadlock
     FROM pg_database D;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f9287b7942..0dc26dcb19 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4599,6 +4599,7 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry)
 	dbentry->n_conflict_tablespace = 0;
 	dbentry->n_conflict_lock = 0;
 	dbentry->n_conflict_snapshot = 0;
+	dbentry->n_conflict_logicalslot = 0;
 	dbentry->n_conflict_bufferpin = 0;
 	dbentry->n_conflict_startup_deadlock = 0;
 	dbentry->n_temp_files = 0;
@@ -6223,6 +6224,9 @@ pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			dbentry->n_conflict_snapshot++;
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			dbentry->n_conflict_logicalslot++;
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_BUFFERPIN:
 			dbentry->n_conflict_bufferpin++;
 			break;
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 00aa95ba15..3a18ef052d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -46,6 +46,7 @@
 #include "pgstat.h"
 #include "replication/slot.h"
 #include "storage/fd.h"
+#include "storage/lock.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
@@ -101,6 +102,7 @@ int			max_replication_slots = 0;	/* the maximum number of replication
 
 static void ReplicationSlotDropAcquired(void);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
+static void ReplicationSlotDropConflicting(ReplicationSlot *slot);
 
 /* internal persistency functions */
 static void RestoreSlotFromDisk(const char *name);
@@ -637,6 +639,64 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
 	LWLockRelease(ReplicationSlotAllocationLock);
 }
 
+/*
+ * Permanently drop a conflicting replication slot. If it's already active by
+ * another backend, send it a recovery conflict signal, and then try again.
+ */
+static void
+ReplicationSlotDropConflicting(ReplicationSlot *slot)
+{
+	pid_t		active_pid;
+	PGPROC	   *proc;
+	VirtualTransactionId	vxid;
+
+	ConditionVariablePrepareToSleep(&slot->active_cv);
+	while (1)
+	{
+		SpinLockAcquire(&slot->mutex);
+		active_pid = slot->active_pid;
+		if (active_pid == 0)
+			active_pid = slot->active_pid = MyProcPid;
+		SpinLockRelease(&slot->mutex);
+
+		/* Drop the acquired slot, unless it is acquired by another backend */
+		if (active_pid == MyProcPid)
+		{
+			elog(DEBUG1, "acquired conflicting slot, now dropping it");
+			ReplicationSlotDropPtr(slot);
+			break;
+		}
+
+		/* Send the other backend, a conflict recovery signal */
+
+		SetInvalidVirtualTransactionId(vxid);
+		LWLockAcquire(ProcArrayLock, LW_SHARED);
+		proc = BackendPidGetProcWithLock(active_pid);
+		if (proc)
+			GET_VXID_FROM_PGPROC(vxid, *proc);
+		LWLockRelease(ProcArrayLock);
+
+		/*
+		 * If coincidently that process finished, some other backend may
+		 * acquire the slot again. So start over again.
+		 * Note: Even if vxid.localTransactionId is invalid, we need to cancel
+		 * that backend, because there is no other way to make it release the
+		 * slot. So don't bother to validate vxid.localTransactionId.
+		 */
+		if (vxid.backendId == InvalidBackendId)
+			continue;
+
+		elog(DEBUG1, "cancelling pid %d (backendId: %d) for releasing slot",
+					 active_pid, vxid.backendId);
+
+		CancelVirtualTransaction(vxid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+		ConditionVariableSleep(&slot->active_cv,
+							   WAIT_EVENT_REPLICATION_SLOT_DROP);
+	}
+
+	ConditionVariableCancelSleep();
+}
+
 /*
  * Serialize the currently acquired slot's state from memory to disk, thereby
  * guaranteeing the current state will survive a crash.
@@ -1083,6 +1143,122 @@ ReplicationSlotReserveWal(void)
 	}
 }
 
+/*
+ * Resolve recovery conflicts with logical slots.
+ *
+ * When xid is valid, it means that rows older than xid might have been
+ * removed. Therefore we need to drop slots that depend on seeing those rows.
+ * When xid is invalid, drop all logical slots. This is required when the
+ * master wal_level is set back to replica, so existing logical slots need to
+ * be dropped. Also, when xid is invalid, a common 'conflict_reason' is
+ * provided for the error detail; otherwise it is NULL, in which case it is
+ * constructed out of the xid value.
+ */
+void
+ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid,
+										char *conflict_reason)
+{
+	int			i;
+	bool		found_conflict = false;
+
+	if (max_replication_slots <= 0)
+		return;
+
+restart:
+	if (found_conflict)
+	{
+		CHECK_FOR_INTERRUPTS();
+		found_conflict = false;
+	}
+
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	for (i = 0; i < max_replication_slots; i++)
+	{
+		ReplicationSlot *s;
+
+		s = &ReplicationSlotCtl->replication_slots[i];
+
+		/* cannot change while ReplicationSlotCtlLock is held */
+		if (!s->in_use)
+			continue;
+
+		/* We are only dealing with *logical* slot conflicts. */
+		if (!SlotIsLogical(s))
+			continue;
+
+		/* Invalid xid means caller is asking to drop all logical slots */
+		if (!TransactionIdIsValid(xid))
+			found_conflict = true;
+		else
+		{
+			TransactionId slot_xmin;
+			TransactionId slot_catalog_xmin;
+			StringInfoData	conflict_str, conflict_xmins;
+			char	   *conflict_sentence =
+				gettext_noop("Slot conflicted with xid horizon which was being increased to");
+
+			/* not our database, skip */
+			if (s->data.database != InvalidOid && s->data.database != dboid)
+				continue;
+
+			SpinLockAcquire(&s->mutex);
+			slot_xmin = s->data.xmin;
+			slot_catalog_xmin = s->data.catalog_xmin;
+			SpinLockRelease(&s->mutex);
+
+			/*
+			 * Build the conflict_str which will look like :
+			 * "Slot conflicted with xid horizon which was being increased
+			 * to 9012 (slot xmin: 1234, slot catalog_xmin: 5678)."
+			 */
+			initStringInfo(&conflict_xmins);
+			if (TransactionIdIsValid(slot_xmin) &&
+				TransactionIdPrecedesOrEquals(slot_xmin, xid))
+			{
+				appendStringInfo(&conflict_xmins, "slot xmin: %d", slot_xmin);
+			}
+			if (TransactionIdIsValid(slot_catalog_xmin) &&
+				TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid))
+				appendStringInfo(&conflict_xmins, "%sslot catalog_xmin: %d",
+								 conflict_xmins.len > 0 ? ", " : "",
+								 slot_catalog_xmin);
+
+			if (conflict_xmins.len > 0)
+			{
+				initStringInfo(&conflict_str);
+				appendStringInfo(&conflict_str, "%s %d (%s).",
+								 conflict_sentence, xid, conflict_xmins.data);
+				found_conflict = true;
+				conflict_reason = conflict_str.data;
+			}
+		}
+
+		if (found_conflict)
+		{
+			NameData	slotname;
+
+			SpinLockAcquire(&s->mutex);
+			slotname = s->data.name;
+			SpinLockRelease(&s->mutex);
+
+			/* ReplicationSlotDropConflicting() will acquire the lock below */
+			LWLockRelease(ReplicationSlotControlLock);
+
+			ReplicationSlotDropConflicting(s);
+
+			ereport(LOG,
+					(errmsg("dropped conflicting slot %s", NameStr(slotname)),
+					 errdetail("%s", conflict_reason)));
+
+			/* We released the lock above; so re-scan the slots. */
+			goto restart;
+		}
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+}
+
+
 /*
  * Flush all replication slots to disk.
  *
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index f45a619deb..db232306b8 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -2670,6 +2670,10 @@ CancelVirtualTransaction(VirtualTransactionId vxid, ProcSignalReason sigmode)
 
 		GET_VXID_FROM_PGPROC(procvxid, *proc);
 
+		/*
+		 * Note: vxid.localTransactionId can be invalid, which means the
+		 * request is to signal the pid that is not running a transaction.
+		 */
 		if (procvxid.backendId == vxid.backendId &&
 			procvxid.localTransactionId == vxid.localTransactionId)
 		{
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 65d3946386..68c438d0c5 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -558,6 +558,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_SNAPSHOT);
 
+	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT))
+		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT);
+
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK);
 
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 08f695a980..de64eee0e7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -23,6 +23,7 @@
 #include "access/xloginsert.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/slot.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -296,7 +297,8 @@ ResolveRecoveryConflictWithVirtualXIDs(VirtualTransactionId *waitlist,
 }
 
 void
-ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid, RelFileNode node)
+ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
+									bool onCatalogTable, RelFileNode node)
 {
 	VirtualTransactionId *backends;
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 00c77b66c7..c3ad1d5988 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2445,6 +2445,9 @@ errdetail_recovery_conflict(void)
 		case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
 			errdetail("User query might have needed to see row versions that must be removed.");
 			break;
+		case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+			errdetail("User was using the logical slot that must be dropped.");
+			break;
 		case PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK:
 			errdetail("User transaction caused buffer deadlock with recovery.");
 			break;
@@ -2913,6 +2916,25 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
 			case PROCSIG_RECOVERY_CONFLICT_LOCK:
 			case PROCSIG_RECOVERY_CONFLICT_TABLESPACE:
 			case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT:
+			case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
+				/*
+				 * For conflicts that require a logical slot to be dropped, the
+				 * requirement is for the signal receiver to release the slot,
+				 * so that it could be dropped by the signal sender. So for
+				 * normal backends, the transaction should be aborted, just
+				 * like for other recovery conflicts. But if it's walsender on
+				 * standby, then it has to be killed so as to release an
+				 * acquired logical slot.
+				 */
+				if (am_cascading_walsender &&
+					reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT &&
+					MyReplicationSlot && SlotIsLogical(MyReplicationSlot))
+				{
+					RecoveryConflictPending = true;
+					QueryCancelPending = true;
+					InterruptPending = true;
+					break;
+				}
 
 				/*
 				 * If we aren't in a transaction any longer then ignore.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index cea01534a5..b4b1dbc177 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1471,6 +1471,21 @@ pg_stat_get_db_conflict_snapshot(PG_FUNCTION_ARGS)
 	PG_RETURN_INT64(result);
 }
 
+Datum
+pg_stat_get_db_conflict_logicalslot(PG_FUNCTION_ARGS)
+{
+	Oid			dbid = PG_GETARG_OID(0);
+	int64		result;
+	PgStat_StatDBEntry *dbentry;
+
+	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
+		result = 0;
+	else
+		result = (int64) (dbentry->n_conflict_logicalslot);
+
+	PG_RETURN_INT64(result);
+}
+
 Datum
 pg_stat_get_db_conflict_bufferpin(PG_FUNCTION_ARGS)
 {
@@ -1514,6 +1529,7 @@ pg_stat_get_db_conflict_all(PG_FUNCTION_ARGS)
 		result = (int64) (dbentry->n_conflict_tablespace +
 						  dbentry->n_conflict_lock +
 						  dbentry->n_conflict_snapshot +
+						  dbentry->n_conflict_logicalslot +
 						  dbentry->n_conflict_bufferpin +
 						  dbentry->n_conflict_startup_deadlock);
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 7fb574f9dc..12e7be1132 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5323,6 +5323,11 @@
   proname => 'pg_stat_get_db_conflict_snapshot', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_conflict_snapshot' },
+{ oid => '3434',
+  descr => 'statistics: recovery conflicts in database caused by logical replication slot',
+  proname => 'pg_stat_get_db_conflict_logicalslot', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_conflict_logicalslot' },
 { oid => '3068',
   descr => 'statistics: recovery conflicts in database caused by shared buffer pin',
   proname => 'pg_stat_get_db_conflict_bufferpin', provolatile => 's',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 1a19921f80..1f4a000c48 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -604,6 +604,7 @@ typedef struct PgStat_StatDBEntry
 	PgStat_Counter n_conflict_tablespace;
 	PgStat_Counter n_conflict_lock;
 	PgStat_Counter n_conflict_snapshot;
+	PgStat_Counter n_conflict_logicalslot;
 	PgStat_Counter n_conflict_bufferpin;
 	PgStat_Counter n_conflict_startup_deadlock;
 	PgStat_Counter n_temp_files;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 3e95b019b3..200720c589 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -204,4 +204,6 @@ extern void CheckPointReplicationSlots(void);
 
 extern void CheckSlotRequirements(void);
 
+extern void ResolveRecoveryConflictWithLogicalSlots(Oid dboid, TransactionId xid, char *reason);
+
 #endif							/* SLOT_H */
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 90607df106..f400069c78 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -39,6 +39,7 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_TABLESPACE,
 	PROCSIG_RECOVERY_CONFLICT_LOCK,
 	PROCSIG_RECOVERY_CONFLICT_SNAPSHOT,
+	PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index cfbe426e5a..7e0bc43ac4 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -28,7 +28,7 @@ extern void InitRecoveryTransactionEnvironment(void);
 extern void ShutdownRecoveryTransactionEnvironment(void);
 
 extern void ResolveRecoveryConflictWithSnapshot(TransactionId latestRemovedXid,
-												RelFileNode node);
+									bool onCatalogTable, RelFileNode node);
 extern void ResolveRecoveryConflictWithTablespace(Oid tsid);
 extern void ResolveRecoveryConflictWithDatabase(Oid dbid);
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index c7304611c3..5ed064627f 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1845,6 +1845,7 @@ pg_stat_database_conflicts| SELECT d.oid AS datid,
     pg_stat_get_db_conflict_tablespace(d.oid) AS confl_tablespace,
     pg_stat_get_db_conflict_lock(d.oid) AS confl_lock,
     pg_stat_get_db_conflict_snapshot(d.oid) AS confl_snapshot,
+    pg_stat_get_db_conflict_logicalslot(d.oid) AS confl_logicalslot,
     pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin,
     pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock
    FROM pg_database d;
-- 
2.20.1

