From 0add8bb19a4ee83c6a6ec1f313329d737bf304a5 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <bossartn@amazon.com>
Date: Sun, 12 Dec 2021 22:07:11 -0800
Subject: [PATCH v6 6/6] Move removal of old logical rewrite mapping files to
 custodian.

If there are many such files to remove, checkpoints can take much
longer.  To avoid this, move this work to the newly-introduced
custodian process.
---
 src/backend/access/heap/rewriteheap.c | 79 +++++++++++++++++++++++----
 src/backend/postmaster/custodian.c    | 44 +++++++++++++++
 src/include/access/rewriteheap.h      |  1 +
 src/include/postmaster/custodian.h    |  5 ++
 4 files changed, 119 insertions(+), 10 deletions(-)

diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index 2a53826736..edeab65e60 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -116,6 +116,7 @@
 #include "lib/ilist.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/custodian.h"
 #include "replication/logical.h"
 #include "replication/slot.h"
 #include "storage/bufmgr.h"
@@ -1182,7 +1183,8 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
  * Perform a checkpoint for logical rewrite mappings
  *
  * This serves two tasks:
- * 1) Remove all mappings not needed anymore based on the logical restart LSN
+ * 1) Alert the custodian to remove all mappings not needed anymore based on the
+ *    logical restart LSN
  * 2) Flush all remaining mappings to disk, so that replay after a checkpoint
  *	  only has to deal with the parts of a mapping that have been written out
  *	  after the checkpoint started.
@@ -1210,6 +1212,10 @@ CheckPointLogicalRewriteHeap(void)
 	if (cutoff != InvalidXLogRecPtr && redo < cutoff)
 		cutoff = redo;
 
+	/* let the custodian know what it can remove */
+	CustodianSetLogicalRewriteCutoff(cutoff);
+	RequestCustodian(CUSTODIAN_REMOVE_REWRITE_MAPPINGS);
+
 	mappings_dir = AllocateDir("pg_logical/mappings");
 	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
 	{
@@ -1240,15 +1246,7 @@ CheckPointLogicalRewriteHeap(void)
 
 		lsn = ((uint64) hi) << 32 | lo;
 
-		if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
-		{
-			elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
-			if (unlink(path) < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not remove file \"%s\": %m", path)));
-		}
-		else
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
 		{
 			/* on some operating systems fsyncing a file requires O_RDWR */
 			int			fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
@@ -1286,3 +1284,64 @@ CheckPointLogicalRewriteHeap(void)
 	/* persist directory entries to disk */
 	fsync_fname("pg_logical/mappings", true);
 }
+
+/*
+ * Remove all mappings not needed anymore based on the logical restart LSN saved
+ * by the checkpointer.  We use this saved value instead of calling
+ * ReplicationSlotsComputeLogicalRestartLSN() so that we don't interfere with an
+ * ongoing call to CheckPointLogicalRewriteHeap() that is flushing mappings to
+ * disk.
+ */
+void
+RemoveOldLogicalRewriteMappings(void)
+{
+	XLogRecPtr	cutoff;
+	DIR		   *mappings_dir;
+	struct dirent *mapping_de;
+	char		path[MAXPGPATH + 20];
+	bool		value_set = false;
+
+	cutoff = CustodianGetLogicalRewriteCutoff(&value_set);
+	if (!value_set)
+		return;
+
+	mappings_dir = AllocateDir("pg_logical/mappings");
+	while ((mapping_de = ReadDir(mappings_dir, "pg_logical/mappings")) != NULL)
+	{
+		struct stat statbuf;
+		Oid			dboid;
+		Oid			relid;
+		XLogRecPtr	lsn;
+		TransactionId rewrite_xid;
+		TransactionId create_xid;
+		uint32		hi,
+					lo;
+
+		if (strcmp(mapping_de->d_name, ".") == 0 ||
+			strcmp(mapping_de->d_name, "..") == 0)
+			continue;
+
+		snprintf(path, sizeof(path), "pg_logical/mappings/%s", mapping_de->d_name);
+		if (lstat(path, &statbuf) == 0 && !S_ISREG(statbuf.st_mode))
+			continue;
+
+		/* Skip over files that cannot be ours. */
+		if (strncmp(mapping_de->d_name, "map-", 4) != 0)
+			continue;
+
+		if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
+				   &dboid, &relid, &hi, &lo, &rewrite_xid, &create_xid) != 6)
+			elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
+
+		lsn = ((uint64) hi) << 32 | lo;
+		if (lsn >= cutoff && cutoff != InvalidXLogRecPtr)
+			continue;
+
+		elog(DEBUG1, "removing logical rewrite file \"%s\"", path);
+		if (unlink(path) < 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not remove file \"%s\": %m", path)));
+	}
+	FreeDir(mappings_dir);
+}
diff --git a/src/backend/postmaster/custodian.c b/src/backend/postmaster/custodian.c
index 861de882c6..0ce4edcf61 100644
--- a/src/backend/postmaster/custodian.c
+++ b/src/backend/postmaster/custodian.c
@@ -27,6 +27,7 @@
 
 #include <time.h>
 
+#include "access/rewriteheap.h"
 #include "libpq/pqsignal.h"
 #include "pgstat.h"
 #include "postmaster/custodian.h"
@@ -46,6 +47,9 @@ typedef struct
 {
 	slock_t		cust_lck;
 	int			cust_flags;
+
+	XLogRecPtr  logical_rewrite_mappings_cutoff;    /* can remove older mappings */
+	bool        logical_rewrite_mappings_cutoff_set;
 } CustodianShmemStruct;
 
 static CustodianShmemStruct *CustodianShmem;
@@ -222,6 +226,16 @@ CustodianMain(void)
 		if (flags & CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS)
 			RemoveOldSerializedSnapshots();
 
+		/*
+		 * Remove logical rewrite mapping files that are no longer needed.
+		 *
+		 * It is not important for these to be removed in single-user mode, so
+		 * we don't need any extra handling outside of the custodian process for
+		 * this.
+		 */
+		if (flags & CUSTODIAN_REMOVE_REWRITE_MAPPINGS)
+			RemoveOldLogicalRewriteMappings();
+
 		/* Calculate how long to sleep */
 		end_time = (pg_time_t) time(NULL);
 		elapsed_secs = end_time - start_time;
@@ -274,3 +288,33 @@ RequestCustodian(int flags)
 	if (ProcGlobal->custodianLatch)
 		SetLatch(ProcGlobal->custodianLatch);
 }
+
+/*
+ * Used by CheckPointLogicalRewriteHeap() to tell the custodian which logical
+ * rewrite mapping files it can remove.
+ */
+void
+CustodianSetLogicalRewriteCutoff(XLogRecPtr cutoff)
+{
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+	CustodianShmem->logical_rewrite_mappings_cutoff = cutoff;
+	CustodianShmem->logical_rewrite_mappings_cutoff_set = true;
+	SpinLockRelease(&CustodianShmem->cust_lck);
+}
+
+/*
+ * Used by the custodian to determine which logical rewrite mapping files it can
+ * remove.
+ */
+XLogRecPtr
+CustodianGetLogicalRewriteCutoff(bool *value_set)
+{
+	XLogRecPtr  cutoff;
+
+	SpinLockAcquire(&CustodianShmem->cust_lck);
+	cutoff = CustodianShmem->logical_rewrite_mappings_cutoff;
+	*value_set = CustodianShmem->logical_rewrite_mappings_cutoff_set;
+	SpinLockRelease(&CustodianShmem->cust_lck);
+
+	return cutoff;
+}
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 3e27790b3f..61d7aa8ed8 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -53,5 +53,6 @@ typedef struct LogicalRewriteMappingData
  */
 #define LOGICAL_REWRITE_FORMAT "map-%x-%x-%X_%X-%x-%x"
 extern void CheckPointLogicalRewriteHeap(void);
+extern void RemoveOldLogicalRewriteMappings(void);
 
 #endif							/* REWRITE_HEAP_H */
diff --git a/src/include/postmaster/custodian.h b/src/include/postmaster/custodian.h
index 769c07f2c9..1af96ebd02 100644
--- a/src/include/postmaster/custodian.h
+++ b/src/include/postmaster/custodian.h
@@ -12,13 +12,18 @@
 #ifndef _CUSTODIAN_H
 #define _CUSTODIAN_H
 
+#include "access/xlogdefs.h"
+
 extern void CustodianMain(void) pg_attribute_noreturn();
 extern Size CustodianShmemSize(void);
 extern void CustodianShmemInit(void);
 extern void RequestCustodian(int flags);
+extern void CustodianSetLogicalRewriteCutoff(XLogRecPtr cutoff);
+extern XLogRecPtr CustodianGetLogicalRewriteCutoff(bool *value_set);
 
 /* flags for RequestCustodian() */
 #define CUSTODIAN_REMOVE_TEMP_FILES				0x0001
 #define CUSTODIAN_REMOVE_SERIALIZED_SNAPSHOTS	0x0002
+#define CUSTODIAN_REMOVE_REWRITE_MAPPINGS		0x0004
 
 #endif						/* _CUSTODIAN_H */
-- 
2.25.1

