From f1579d37a9f293d7cc911ea048b68d3270b2cdf5 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@otacoo.com>
Date: Wed, 10 Dec 2014 22:10:16 +0900
Subject: [PATCH] Prototype to support record-level compression

This will be enough for tests with compression.
---
 src/backend/access/transam/xlog.c       |  1 +
 src/backend/access/transam/xloginsert.c | 64 +++++++++++++++++++++++++++++++++
 src/backend/access/transam/xlogreader.c | 17 +++++++++
 src/backend/utils/misc/guc.c            | 10 ++++++
 src/include/access/xlog.h               |  1 +
 src/include/access/xlogrecord.h         |  5 +++
 6 files changed, 98 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0f09add..a0e15be 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -88,6 +88,7 @@ char	   *XLogArchiveCommand = NULL;
 bool		EnableHotStandby = false;
 bool		fullPageWrites = true;
 bool		wal_log_hints = false;
+bool		wal_compression = false;
 bool		log_checkpoints = false;
 int			sync_method = DEFAULT_SYNC_METHOD;
 int			wal_level = WAL_LEVEL_MINIMAL;
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index f3d610f..a395842 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -29,6 +29,7 @@
 #include "storage/proc.h"
 #include "utils/memutils.h"
 #include "pg_trace.h"
+#include "utils/pg_lzcompress.h"
 
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
@@ -56,6 +57,9 @@ static registered_buffer *registered_buffers;
 static int	max_registered_buffers;		/* allocated size */
 static int	max_registered_block_id = 0;		/* highest block_id + 1
 												 * currently registered */
+static char *compressed_buffer = NULL;
+static char *uncompressed_buffer = NULL;
+static XLogRecData *compressed_rdata = NULL;
 
 /*
  * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
@@ -455,6 +459,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	XLogRecData *rdt_datas_last;
 	XLogRecord *rechdr;
 	char	   *scratch = hdr_scratch;
+	bool		is_compressed = false;
 
 	/*
 	 * Note: this function can be called multiple times for the same record.
@@ -653,6 +658,57 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	total_len += hdr_rdt.len;
 
 	/*
+	 * Compress whole record content here, except the record header. After doing
+	 * compression, the record is made of two entries:
+	 * - record header
+	 * - compressed entry containing all the other entries after XLogRecord that
+	 * has a fixed size of SizeOfXLogRecord.
+	 * Compression is not done if record has a length higher than 4 * BLCKSZ.
+	 */
+	if (wal_compression && (total_len - SizeOfXLogRecord) < 4 * BLCKSZ)
+	{
+		XLogRecData	   *rdata = hdr_rdt.next;
+		uint32			position = 0;
+
+		/*
+		 * Save data first for all the block headers if any.
+		 */
+		if (hdr_rdt.len > SizeOfXLogRecord)
+		{
+			memcpy(uncompressed_buffer, hdr_rdt.data + SizeOfXLogRecord,
+				   hdr_rdt.len - SizeOfXLogRecord);
+			position += hdr_rdt.len - SizeOfXLogRecord;
+		}
+
+		/* Now add all the other entries except header*/
+		for (; rdata != NULL; rdata = rdata->next)
+		{
+			memcpy(uncompressed_buffer + position, rdata->data, rdata->len);
+			position += rdata->len;
+		}
+		Assert(position == total_len - SizeOfXLogRecord);
+
+		/* do the compression */
+		if (pglz_compress(uncompressed_buffer, position,
+						  (PGLZ_Header *) compressed_buffer,
+						  PGLZ_strategy_default) == PGLZ_OK)
+		{
+			uint32 compressed_len = VARSIZE((struct varlena *) compressed_buffer);
+
+			/* aave former length, used when decoding record for some checks */
+			compressed_rdata->next = NULL;
+			compressed_rdata->len = compressed_len;
+			compressed_rdata->data = compressed_buffer;
+
+			/* Reassemble record and calculate new record length */
+			hdr_rdt.next = compressed_rdata;
+			hdr_rdt.len = SizeOfXLogRecord;
+			total_len = compressed_len + SizeOfXLogRecord;
+			is_compressed = true;
+		}
+	}
+
+	/*
 	 * Calculate CRC of the data
 	 *
 	 * Note that the record header isn't added into the CRC initially since we
@@ -673,6 +729,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_xid = GetCurrentTransactionIdIfAny();
 	rechdr->xl_tot_len = total_len;
 	rechdr->xl_info = info;
+	if (is_compressed)
+		rechdr->xl_info |= XLR_COMPRESSED;
 	rechdr->xl_rmid = rmid;
 	rechdr->xl_prev = InvalidXLogRecPtr;
 	rechdr->xl_crc = rdata_crc;
@@ -886,6 +944,12 @@ InitXLogInsert(void)
 									sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
 		max_rdatas = XLR_NORMAL_RDATAS;
 	}
+	if (compressed_buffer == NULL)
+		compressed_buffer = (char *) palloc(PGLZ_MAX_OUTPUT(4 * BLCKSZ));
+	if (uncompressed_buffer == NULL)
+		uncompressed_buffer = (char *) palloc(PGLZ_MAX_OUTPUT(4 * BLCKSZ));
+	if (compressed_rdata == NULL)
+		compressed_rdata = palloc(sizeof(XLogRecData));
 
 	/*
 	 * Allocate a buffer to hold the header information for a WAL record.
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 67d6223..debcdf7 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -20,6 +20,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
+#include "utils/pg_lzcompress.h"
 
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
 
@@ -953,6 +954,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 	uint32		datatotal;
 	RelFileNode *rnode = NULL;
 	uint8		block_id;
+	char	   *uncompressed_record = NULL;
 
 	ResetDecoder(state);
 
@@ -962,6 +964,21 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 	ptr += SizeOfXLogRecord;
 	remaining = record->xl_tot_len - SizeOfXLogRecord;
 
+	/*
+	 * Check if the whole record has been compressed and decompress it
+	 * if this is the case. The rest of the processing depends on that.
+	 */
+	if ((record->xl_info & XLR_COMPRESSED) != 0)
+	{
+		PGLZ_Header *header = (PGLZ_Header *) ptr;
+
+		uncompressed_record = (char *) palloc(PGLZ_RAW_SIZE(header));
+		remaining = PGLZ_RAW_SIZE(header);
+		/* XXX: should check for status code here */
+		pglz_decompress(header, uncompressed_record);
+		ptr = uncompressed_record;
+	}
+
 	/* Decode the headers */
 	datatotal = 0;
 	while (remaining > datatotal)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b1bff7f..249c7d9 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -929,6 +929,16 @@ static struct config_bool ConfigureNamesBool[] =
 	},
 
 	{
+		{"wal_compression", PGC_USERSET, WAL_SETTINGS,
+			 gettext_noop("Compresses full-page writes written in WAL file."),
+			 NULL
+		},
+		&wal_compression,
+		false,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
 			gettext_noop("Logs each checkpoint."),
 			NULL
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index d06fbc0..6bdfa4a 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -98,6 +98,7 @@ extern char *XLogArchiveCommand;
 extern bool EnableHotStandby;
 extern bool fullPageWrites;
 extern bool wal_log_hints;
+extern bool wal_compression;
 extern bool log_checkpoints;
 
 /* WAL levels */
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index 11ddfac..cc209d2 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -71,6 +71,11 @@ typedef struct XLogRecord
 #define XLR_SPECIAL_REL_UPDATE	0x01
 
 /*
+ * Flag defining if record is compressed.
+ */
+#define XLR_COMPRESSED			0x02
+
+/*
  * Header info for block data appended to an XLOG record.
  *
  * Note that we don't attempt to align the XLogRecordBlockHeader struct!
-- 
2.2.0

