From ecee845d049e8a7e939fe3b7bc9807ecc1b0a2c7 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryzbyj@telsasoft.com>
Date: Sun, 14 Mar 2021 17:12:07 -0500
Subject: [PATCH v9 9/9] Use GUC hooks to support compression 'level'

---
 src/backend/access/transam/xlog.c       |  19 +++-
 src/backend/access/transam/xloginsert.c |   7 +-
 src/backend/access/transam/xlogreader.c | 120 ++++++++++++++++++++----
 src/backend/utils/misc/guc.c            |  20 ++--
 src/include/access/xlog.h               |  10 +-
 src/include/access/xlogreader.h         |   2 +
 6 files changed, 144 insertions(+), 34 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 599381337e..4ec688a612 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -98,7 +98,9 @@ char	   *XLogArchiveCommand = NULL;
 bool		EnableHotStandby = false;
 bool		fullPageWrites = true;
 bool		wal_log_hints = false;
-int		wal_compression = WAL_COMPRESSION_ZSTD;
+char		*wal_compression_string = ""; /* Overwritten by GUC */
+int			wal_compression = WAL_COMPRESSION_ZSTD;
+int			wal_compression_level = 1;
 char	   *wal_consistency_checking_string = NULL;
 bool	   *wal_consistency_checking = NULL;
 bool		wal_init_zero = true;
@@ -10603,6 +10605,21 @@ assign_xlog_sync_method(int new_sync_method, void *extra)
 	}
 }
 
+bool
+check_wal_compression(char **newval, void **extra, GucSource source)
+{
+	int tmp;
+	return get_compression_level(*newval, &tmp) != -1;
+}
+
+/* Parse the GUC into integers for wal_compression and wal_compression_level */
+void
+assign_wal_compression(const char *newval, void *extra)
+{
+	wal_compression = get_compression_level(newval, &wal_compression_level);
+	Assert(wal_compression >= 0);
+}
+
 
 /*
  * Issue appropriate kind of fsync (if any) for an XLOG output file.
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 96f497d5d6..16dab2e5a6 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -895,7 +895,7 @@ XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
 		{
 			unsigned long	len_l = COMPRESS_BUFSIZE;
 			int ret;
-			ret = compress2((Bytef*)dest, &len_l, (Bytef*)source, orig_len, 1);
+			ret = compress2((Bytef*)dest, &len_l, (Bytef*)source, orig_len, wal_compression_level);
 			if (ret != Z_OK)
 				len_l = -1;
 			len = len_l;
@@ -905,7 +905,7 @@ XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
 
 #ifdef USE_LZ4
 	case WAL_COMPRESSION_LZ4:
-		len = LZ4_compress_fast(source, dest, orig_len, COMPRESS_BUFSIZE, 1);
+		len = LZ4_compress_fast(source, dest, orig_len, COMPRESS_BUFSIZE, wal_compression_level);
 		if (len == 0)
 			len = -1;
 		break;
@@ -913,8 +913,7 @@ XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
 
 #ifdef USE_ZSTD
 	case WAL_COMPRESSION_ZSTD:
-		len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
-				ZSTD_CLEVEL_DEFAULT);
+		len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len, wal_compression_level);
 		if (ZSTD_isError(len))
 			len = -1;
 		break;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 1b13d1f660..a33f13b6cd 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -26,6 +26,7 @@
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
 #include "replication/origin.h"
+#include "utils/builtins.h"
 #include "utils/guc.h"
 
 #ifndef FRONTEND
@@ -63,35 +64,41 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
 
-/*
- * Accept the likely variants for none and pglz, for compatibility with old
- * server versions where wal_compression was a boolean.
- */
-const struct config_enum_entry wal_compression_options[] = {
+static const struct {
+	char *name;
+	enum WalCompression compress_id; /* The internal ID (which includes the compression level) */
+	bool has_level; /* If it accepts a numeric "level" */
+	int min_level, dfl_level, max_level;
+} wal_compression_options[] = {
+	/*
+	 * Accept the likely variants for none and pglz, for compatibility with old
+	 * server versions where wal_compression was a boolean.
+	 */
 	{"off", WAL_COMPRESSION_NONE, false},
 	{"none", WAL_COMPRESSION_NONE, false},
-	{"false", WAL_COMPRESSION_NONE, true},
-	{"no", WAL_COMPRESSION_NONE, true},
-	{"0", WAL_COMPRESSION_NONE, true},
+	{"false", WAL_COMPRESSION_NONE, false},
+	{"no", WAL_COMPRESSION_NONE, false},
+	{"0", WAL_COMPRESSION_NONE, false},
+
 	{"pglz", WAL_COMPRESSION_PGLZ, false},
-	{"true", WAL_COMPRESSION_PGLZ, true},
-	{"yes", WAL_COMPRESSION_PGLZ, true},
-	{"on", WAL_COMPRESSION_PGLZ, true},
-	{"1", WAL_COMPRESSION_PGLZ, true},
+	{"true", WAL_COMPRESSION_PGLZ, false},
+	{"yes", WAL_COMPRESSION_PGLZ, false},
+	{"on", WAL_COMPRESSION_PGLZ, false},
+	{"1", WAL_COMPRESSION_PGLZ, false},
 
 #ifdef  HAVE_LIBZ
-	{"zlib", WAL_COMPRESSION_ZLIB, false},
+	{"zlib", WAL_COMPRESSION_ZLIB, true, 0, 1, 9},
 #endif
 
 #ifdef  USE_LZ4
-	{"lz4", WAL_COMPRESSION_LZ4, false},
+	{"lz4", WAL_COMPRESSION_LZ4, true, 1, 1, 65537},
 #endif
 
 #ifdef  USE_ZSTD
-	{"zstd", WAL_COMPRESSION_ZSTD, false},
+	{"zstd-fast", WAL_COMPRESSION_ZSTD, true, -50, -10, -1 }, /* Must be before zstd... */
+	{"zstd", WAL_COMPRESSION_ZSTD, true, -50, ZSTD_CLEVEL_DEFAULT, 10},
 #endif
 
-	{NULL, 0, false}
 };
 
 /*
@@ -1578,6 +1585,83 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
 	}
 }
 
+/*
+ * Return the wal compression ID, or -1 if the input is
+ * invalid/unrecognized/unsupported.
+ * The compression level is stored in *level.
+ */
+int
+get_compression_level(const char *in, int *level)
+{
+	for (int idx=0; idx < lengthof(wal_compression_options); ++idx)
+	{
+		int len;
+		int tmp;
+		char *end;
+
+		if (strcmp(in, wal_compression_options[idx].name) == 0)
+		{
+			/* it has no -level suffix */
+			*level = wal_compression_options[idx].dfl_level;
+			return wal_compression_options[idx].compress_id;
+		}
+
+		len = strlen(wal_compression_options[idx].name);
+		if (strncmp(in, wal_compression_options[idx].name, len) != 0)
+			continue;
+		if (in[len] != '-')
+			continue;
+
+		/* it has a -level suffix, but level is not allowed */
+		if (!wal_compression_options[idx].has_level)
+		{
+#ifndef FRONTEND
+			GUC_check_errdetail("Compression method does not accept a compression level");
+#endif
+			return -1;
+		}
+
+		in += len + 1;
+		len = strlen(in);
+		errno = 0;
+		tmp = strtol(in, &end, 0);
+		if (end != in+len || end == in ||
+				(errno != 0 && tmp == 0) ||
+				(errno == ERANGE && (tmp == LONG_MIN || tmp == LONG_MAX)))
+		{
+#ifndef FRONTEND
+			GUC_check_errdetail("Could not parse compression level: %s", in);
+#endif
+			return -1;
+		}
+
+		/*
+		 * For convenience, allow specification of zstd-fast-N, which is
+		 * interpretted as a negative compression level.
+		 */
+		if (strncmp(wal_compression_options[idx].name, "zstd-fast", 9) == 0 &&
+				tmp > 0)
+				tmp = -tmp;
+
+		if (tmp < wal_compression_options[idx].min_level ||
+				tmp > wal_compression_options[idx].max_level)
+		{
+#ifndef FRONTEND
+			GUC_check_errdetail("Compression level is outside of allowed range: %d...%d",
+					wal_compression_options[idx].min_level,
+					wal_compression_options[idx].max_level);
+#endif
+			return -1;
+		}
+
+		*level = tmp;
+		return wal_compression_options[idx].compress_id;
+	}
+
+	return -1;
+}
+
+
 /*
  * Return a statically allocated string associated with the given compression
  * method.
@@ -1585,9 +1669,9 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
 const char *
 wal_compression_name(WalCompression compression)
 {
-	for (int i=0; wal_compression_options[i].name != NULL; ++i)
+	for (int i=0; i < lengthof(wal_compression_options); ++i)
 	{
-		if (wal_compression_options[i].val == compression)
+		if (wal_compression_options[i].compress_id == compression)
 			return wal_compression_options[i].name;
 	}
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index f37251a27f..29cb12c8a5 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -4556,6 +4556,16 @@ static struct config_string ConfigureNamesString[] =
 		check_wal_consistency_checking, assign_wal_consistency_checking, NULL
 	},
 
+	{
+		{"wal_compression", PGC_SUSET, WAL_SETTINGS,
+			gettext_noop("Set the method used to compress full page images in the WAL."),
+			NULL
+		},
+		&wal_compression_string,
+		"zstd",
+		check_wal_compression, assign_wal_compression, NULL
+	},
+
 	{
 		{"jit_provider", PGC_POSTMASTER, CLIENT_CONN_PRELOAD,
 			gettext_noop("JIT provider to use."),
@@ -4816,16 +4826,6 @@ static struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
-	{
-		{"wal_compression", PGC_SUSET, WAL_SETTINGS,
-			gettext_noop("Set the method used to compress full page images in the WAL."),
-			NULL
-		},
-		&wal_compression,
-		WAL_COMPRESSION_ZSTD, wal_compression_options,
-		NULL, NULL, NULL
-	},
-
 	{
 		{"dynamic_shared_memory_type", PGC_POSTMASTER, RESOURCES_MEM,
 			gettext_noop("Selects the dynamic shared memory implementation used."),
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index e8b2c53784..7a05838d0b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -19,6 +19,7 @@
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
 #include "storage/fd.h"
+#include "utils/guc.h"
 
 
 /* Sync methods */
@@ -116,7 +117,6 @@ extern char *XLogArchiveCommand;
 extern bool EnableHotStandby;
 extern bool fullPageWrites;
 extern bool wal_log_hints;
-extern int wal_compression;
 extern bool wal_init_zero;
 extern bool wal_recycle;
 extern bool *wal_consistency_checking;
@@ -143,6 +143,9 @@ extern char *PromoteTriggerFile;
 extern RecoveryTargetTimeLineGoal recoveryTargetTimeLineGoal;
 extern TimeLineID recoveryTargetTLIRequested;
 extern TimeLineID recoveryTargetTLI;
+extern char *wal_compression_string;
+extern int wal_compression;
+extern int wal_compression_level;
 
 extern int	CheckPointSegments;
 
@@ -361,6 +364,11 @@ extern void XLogRequestWalReceiverReply(void);
 extern void assign_max_wal_size(int newval, void *extra);
 extern void assign_checkpoint_completion_target(double newval, void *extra);
 
+/* GUC */
+extern bool check_wal_compression(char **newval, void **extra, GucSource source);
+extern void assign_wal_compression(const char *newval, void *extra);
+
+
 /*
  * Routines to start, stop, and get status of a base backup.
  */
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 21d200d3df..b4d0ab4517 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -327,4 +327,6 @@ extern bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
 							   RelFileNode *rnode, ForkNumber *forknum,
 							   BlockNumber *blknum);
 
+extern int get_compression_level(const char *in, int *level);
+
 #endif							/* XLOGREADER_H */
-- 
2.17.0

