diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index adab2f8..57660d3 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2476,6 +2476,35 @@ include_dir 'conf.d'
+
+ wal_consistency (string)
+
+ wal_consistency> configuration parameter
+
+
+
+
+ This parameter is used to check the consistency of WAL records, i.e,
+ whether the WAL records are inserted and applied correctly. When
+ wal_consistency is enabled for a WAL record, it
+ stores a full-page image along with the record. When a full-page image
+ arrives during redo, it compares against the current page to check whether
+ both are consistent.
+
+
+
+ By default, this setting does not contain any value. To check
+ all records written to the write-ahead log, set this parameter to
+ all. To check only some records, specify a
+ comma-separated list of resource managers. The resource managers
+ which are currently supported are heap2>, heap>,
+ btree>, gin>, gist>,
+ spgist>, sequence> and brin>. Only
+ superusers can change this setting.
+
+
+
+
wal_buffers (integer)
diff --git a/src/backend/access/brin/brin_xlog.c b/src/backend/access/brin/brin_xlog.c
index 5a6b728..2af524d 100644
--- a/src/backend/access/brin/brin_xlog.c
+++ b/src/backend/access/brin/brin_xlog.c
@@ -14,6 +14,7 @@
#include "access/brin_pageops.h"
#include "access/brin_xlog.h"
#include "access/xlogutils.h"
+#include "storage/bufmask.h"
/*
@@ -279,3 +280,38 @@ brin_redo(XLogReaderState *record)
elog(PANIC, "brin_redo: unknown op code %u", info);
}
}
+
+/*
+ * Mask a BRIN page before doing consistency checks.
+ */
+void
+brin_mask(char *page, BlockNumber blkno)
+{
+ Page page_norm = (Page) page;
+ OffsetNumber offnum,
+ maxoff;
+
+ mask_page_lsn(page_norm);
+
+ mask_page_hint_bits(page_norm);
+
+ if (BRIN_IS_REGULAR_PAGE(page_norm))
+ {
+ mask_unused_space(page_norm);
+
+ maxoff = PageGetMaxOffsetNumber(page_norm);
+ for (offnum = FirstOffsetNumber;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemId = PageGetItemId(page_norm, offnum);
+
+ if (ItemIdIsUsed(itemId))
+ itemId->lp_flags = LP_UNUSED;
+ }
+ }
+
+ /*
+ * If necessary, handle the case of meta and revmap pages here.
+ */
+}
diff --git a/src/backend/access/gin/ginxlog.c b/src/backend/access/gin/ginxlog.c
index a40f168..f8604db 100644
--- a/src/backend/access/gin/ginxlog.c
+++ b/src/backend/access/gin/ginxlog.c
@@ -15,6 +15,7 @@
#include "access/gin_private.h"
#include "access/xlogutils.h"
+#include "storage/bufmask.h"
#include "utils/memutils.h"
static MemoryContext opCtx; /* working memory for operations */
@@ -758,3 +759,31 @@ gin_xlog_cleanup(void)
MemoryContextDelete(opCtx);
opCtx = NULL;
}
+
+/*
+ * Mask a GIN page before running consistency checks on it.
+ */
+void
+gin_mask(char *page, BlockNumber blkno)
+{
+ Page page_norm = (Page) page;
+ GinPageOpaque opaque;
+
+ mask_page_lsn(page_norm);
+ opaque = GinPageGetOpaque(page_norm);
+
+ /* GIN metapage doesn't use pd_lower/pd_upper. Other page types do. */
+ if (opaque->flags != GIN_META)
+ {
+ mask_page_hint_bits(page_norm);
+
+ /*
+ * For GIN_DELETED page, the page is initialized to empty.
+ * Hence mask everything.
+ */
+ if (opaque->flags & GIN_DELETED)
+ memset(page_norm, MASK_MARKER, BLCKSZ);
+ else
+ mask_unused_space(page_norm);
+ }
+}
diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c
index 5853d76..f7abb9c 100644
--- a/src/backend/access/gist/gistxlog.c
+++ b/src/backend/access/gist/gistxlog.c
@@ -16,6 +16,7 @@
#include "access/gist_private.h"
#include "access/xloginsert.h"
#include "access/xlogutils.h"
+#include "storage/bufmask.h"
#include "utils/memutils.h"
static MemoryContext opCtx; /* working memory for operations */
@@ -343,6 +344,52 @@ gist_xlog_cleanup(void)
}
/*
+ * Mask a Gist page before running consistency checks on it.
+ */
+void
+gist_mask(char *page, BlockNumber blkno)
+{
+ Page page_norm = (Page) page;
+ OffsetNumber offnum,
+ maxoff;
+
+ mask_page_lsn(page_norm);
+
+ mask_page_hint_bits(page_norm);
+ mask_unused_space(page_norm);
+
+ /* Mask NSN */
+ GistPageSetNSN(page_norm, PG_UINT64_MAX);
+
+ /*
+ * We update F_FOLLOW_RIGHT flag on the left child after writing WAL
+ * record. Hence, mask this flag.
+ */
+ GistMarkFollowRight(page_norm);
+
+ if (GistPageIsLeaf(page_norm))
+ {
+ /*
+ * For gist leaf pages, mask some line pointer bits, particularly
+ * those marked as used on a master and unused on a standby.
+ */
+ maxoff = PageGetMaxOffsetNumber(page_norm);
+ for (offnum = FirstOffsetNumber;
+ offnum <= maxoff;
+ offnum = OffsetNumberNext(offnum))
+ {
+ ItemId itemId = PageGetItemId(page_norm, offnum);
+
+ if (ItemIdIsUsed(itemId))
+ itemId->lp_flags = LP_UNUSED;
+ }
+ }
+
+ /* In Gist redo, we never mark a page as garbage. Hence, Mask It.*/
+ GistClearPageHasGarbage(page_norm);
+}
+
+/*
* Write WAL record of a page split.
*/
XLogRecPtr
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index b019bc1..c5fe761 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -57,6 +57,7 @@
#include "catalog/namespace.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "storage/bufmask.h"
#include "storage/bufmgr.h"
#include "storage/freespace.h"
#include "storage/lmgr.h"
@@ -9131,3 +9132,61 @@ heap_sync(Relation rel)
heap_close(toastrel, AccessShareLock);
}
}
+
+/*
+ * Mask a heap page before performing consistency checks on it.
+ */
+void
+heap_mask(char *page, BlockNumber blkno)
+{
+ Page page_norm = (Page) page;
+ OffsetNumber off;
+
+ mask_page_lsn(page_norm);
+
+ mask_page_hint_bits(page_norm);
+ mask_unused_space(page_norm);
+
+ for (off = 1; off <= PageGetMaxOffsetNumber(page_norm); off++)
+ {
+ ItemId iid = PageGetItemId(page, off);
+ char *page_item;
+
+ page_item = (char *) (page_norm + ItemIdGetOffset(iid));
+
+ /*
+ * Ignore hint bits and command ID.
+ */
+ if (ItemIdIsNormal(iid))
+ {
+ HeapTupleHeader page_htup = (HeapTupleHeader) page_item;
+
+ page_htup->t_infomask =
+ HEAP_XMIN_COMMITTED | HEAP_XMIN_INVALID |
+ HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID;
+ page_htup->t_infomask |= HEAP_XACT_MASK;
+ page_htup->t_choice.t_heap.t_field3.t_cid = PG_UINT32_MAX;
+
+ /*
+ * For a speculative tuple, the content of t_ctid is conflicting
+ * between the backup page and current page. Hence, we set it
+ * to the current block number and current offset.
+ */
+ if (HeapTupleHeaderIsSpeculative(page_htup))
+ ItemPointerSet(&page_htup->t_ctid, blkno, off);
+ }
+
+ /*
+ * Ignore any padding bytes after the tuple, when the length of
+ * the item is not MAXALIGNed.
+ */
+ if (ItemIdHasStorage(iid))
+ {
+ int len = ItemIdGetLength(iid);
+ int padlen = MAXALIGN(len) - len;
+
+ if (padlen > 0)
+ memset(page_item + len, MASK_MARKER, padlen);
+ }
+ }
+}
diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c
index c536e22..bd1e353 100644
--- a/src/backend/access/nbtree/nbtxlog.c
+++ b/src/backend/access/nbtree/nbtxlog.c
@@ -19,6 +19,7 @@
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
+#include "storage/bufmask.h"
#include "storage/procarray.h"
#include "miscadmin.h"
@@ -1028,3 +1029,56 @@ btree_redo(XLogReaderState *record)
elog(PANIC, "btree_redo: unknown op code %u", info);
}
}
+
+/*
+ * Mask a btree page before performing consistency checks on it.
+ */
+void
+btree_mask(char *page, BlockNumber blkno)
+{
+ Page page_norm = (Page) page;
+ OffsetNumber off;
+ OffsetNumber maxoff;
+ BTPageOpaque maskopaq;
+
+ mask_page_lsn(page_norm);
+
+ mask_page_hint_bits(page_norm);
+ mask_unused_space(page_norm);
+
+ maskopaq = (BTPageOpaque) PageGetSpecialPointer(page_norm);
+
+ /*
+ * Mask everything on a DELETED page.
+ */
+ if ((maskopaq->btpo_flags & BTP_DELETED) != 0)
+ {
+ /* Page content, between standard page header and opaque struct */
+ memset(page_norm + SizeOfPageHeaderData, MASK_MARKER,
+ BLCKSZ - SizeOfPageHeaderData);
+
+ /* pd_lower and upper */
+ memset(&((PageHeader) page_norm)->pd_lower, MASK_MARKER,
+ sizeof(uint16));
+ memset(&((PageHeader) page_norm)->pd_upper, MASK_MARKER,
+ sizeof(uint16));
+ }
+ else
+ {
+ /*
+ * Mask some line pointer bits, particularly those marked as
+ * used on a master and unused on a standby.
+ */
+ maxoff = PageGetMaxOffsetNumber(page_norm);
+ for (off = 1; off <= maxoff; off++)
+ {
+ ItemId iid = PageGetItemId(page_norm, off);
+
+ if (ItemIdIsUsed(iid))
+ iid->lp_flags = LP_UNUSED;
+ }
+ }
+
+ maskopaq->btpo_flags |= BTP_SPLIT_END | BTP_HAS_GARBAGE;
+ maskopaq->btpo_cycleid = 0;
+}
diff --git a/src/backend/access/rmgrdesc/gindesc.c b/src/backend/access/rmgrdesc/gindesc.c
index db832a5..75d0e09 100644
--- a/src/backend/access/rmgrdesc/gindesc.c
+++ b/src/backend/access/rmgrdesc/gindesc.c
@@ -113,7 +113,12 @@ gin_desc(StringInfo buf, XLogReaderState *record)
(ginxlogRecompressDataLeaf *) payload;
if (XLogRecHasBlockImage(record, 0))
- appendStringInfoString(buf, " (full page image)");
+ {
+ if (XLogRecBlockImageApply(record, 0))
+ appendStringInfoString(buf, " (full page image, apply)");
+ else
+ appendStringInfoString(buf, " (full page image)");
+ }
else
desc_recompress_leaf(buf, insertData);
}
@@ -147,7 +152,12 @@ gin_desc(StringInfo buf, XLogReaderState *record)
ginxlogVacuumDataLeafPage *xlrec = (ginxlogVacuumDataLeafPage *) rec;
if (XLogRecHasBlockImage(record, 0))
- appendStringInfoString(buf, " (full page image)");
+ {
+ if (XLogRecBlockImageApply(record, 0))
+ appendStringInfoString(buf, " (full page image, apply)");
+ else
+ appendStringInfoString(buf, " (full page image)");
+ }
else
desc_recompress_leaf(buf, &xlrec->data);
}
diff --git a/src/backend/access/spgist/spgxlog.c b/src/backend/access/spgist/spgxlog.c
index e016cdb..f66f73a 100644
--- a/src/backend/access/spgist/spgxlog.c
+++ b/src/backend/access/spgist/spgxlog.c
@@ -18,6 +18,7 @@
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
+#include "storage/bufmask.h"
#include "storage/standby.h"
#include "utils/memutils.h"
@@ -1023,3 +1024,19 @@ spg_xlog_cleanup(void)
MemoryContextDelete(opCtx);
opCtx = NULL;
}
+
+/*
+ * Mask a SpGist page
+ */
+void
+spg_mask(char *page, BlockNumber blkno)
+{
+ Page page_norm = (Page) page;
+
+ mask_page_lsn(page_norm);
+
+ mask_page_hint_bits(page_norm);
+
+ if (!SpGistPageIsMeta(page_norm))
+ mask_unused_space(page_norm);
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 9bb1362..eae7524 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -30,8 +30,8 @@
#include "utils/relmapper.h"
/* must be kept in sync with RmgrData definition in xlog_internal.h */
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
- { name, redo, desc, identify, startup, cleanup },
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
+ { name, redo, desc, identify, startup, cleanup, mask },
const RmgrData RmgrTable[RM_MAX_ID + 1] = {
#include "access/rmgrlist.h"
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6cec027..a8355659 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -95,6 +95,8 @@ bool EnableHotStandby = false;
bool fullPageWrites = true;
bool wal_log_hints = false;
bool wal_compression = false;
+char *wal_consistency_string = NULL;
+bool *wal_consistency = NULL;
bool log_checkpoints = false;
int sync_method = DEFAULT_SYNC_METHOD;
int wal_level = WAL_LEVEL_MINIMAL;
@@ -245,6 +247,10 @@ bool InArchiveRecovery = false;
/* Was the last xlog file restored from archive, or local? */
static bool restoredFromArchive = false;
+/* Buffers dedicated to consistency checks of size BLCKSZ */
+static char *new_page_masked = NULL;
+static char *old_page_masked = NULL;
+
/* options taken from recovery.conf for archive recovery */
char *recoveryRestoreCommand = NULL;
static char *recoveryEndCommand = NULL;
@@ -867,6 +873,7 @@ static char *GetXLogBuffer(XLogRecPtr ptr);
static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
+static void checkConsistency(XLogReaderState *record);
static void WALInsertLockAcquire(void);
static void WALInsertLockAcquireExclusive(void);
@@ -1262,6 +1269,99 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
}
/*
+ * Checks whether the current buffer page and backup page stored in the
+ * WAL record are consistent or not. Before comparing the two pages, a
+ * masking is applied to the pages to ignore certain areas like hint bits,
+ * unused space between pd_lower and pd_upper among other things. This
+ * function should be called once WAL replay has been completed for a
+ * given record.
+ */
+static void
+checkConsistency(XLogReaderState *record)
+{
+ RmgrId rmid = XLogRecGetRmid(record);
+ RelFileNode rnode;
+ ForkNumber forknum;
+ BlockNumber blkno;
+ int block_id;
+
+ /* records with no backup blocks have no need for consistency checks */
+ if (!XLogRecHasAnyBlockRefs(record))
+ return;
+
+ /*
+ * Leave if no masking functions defined, this is possible in the case
+ * resource managers generating just full page writes, comparing an
+ * image to itself has no meaning in those cases.
+ */
+ if (RmgrTable[rmid].rm_mask == NULL)
+ return;
+
+ Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0);
+
+ for (block_id = 0; block_id <= record->max_block_id; block_id++)
+ {
+ Buffer buf;
+ Page new_page;
+
+ if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
+ {
+ /* Caller specified a bogus block_id. Do nothing. */
+ continue;
+ }
+
+ Assert(XLogRecHasBlockImage(record, block_id));
+
+ /*
+ * If we've just restored the block from backup image, skip
+ * consistency check.
+ */
+ if (XLogRecBlockImageApply(record, block_id))
+ continue;
+
+ /*
+ * Read the contents from the current buffer and store it in a
+ * temporary page.
+ */
+ buf = XLogReadBufferExtended(rnode, forknum, blkno,
+ RBM_NORMAL);
+ if (!BufferIsValid(buf))
+ continue;
+
+ new_page = BufferGetPage(buf);
+
+ /*
+ * Read the contents from the backup copy, stored in WAL record
+ * and store it in a temporary page. There is not need to allocate
+ * a new page here, a local buffer is fine to hold its contents and
+ * a mask can be directly applied on it.
+ */
+ if (!RestoreBlockImage(record, block_id, old_page_masked))
+ elog(ERROR, "failed to restore block image");
+
+ /*
+ * Take a copy of the new page where WAL has been applied to have
+ * a comparison base before masking it...
+ */
+ memcpy(new_page_masked, new_page, BLCKSZ);
+
+ /* No need for this page anymore now that a copy is in */
+ ReleaseBuffer(buf);
+
+ /* ... And mask both the new and old pages */
+ RmgrTable[rmid].rm_mask(new_page_masked, blkno);
+ RmgrTable[rmid].rm_mask(old_page_masked, blkno);
+
+ /* Time to compare the old and new contents */
+ if (memcmp(new_page_masked, old_page_masked, BLCKSZ) != 0)
+ elog(FATAL,
+ "Inconsistent page found, rel %u/%u/%u, forknum %u, blkno %u",
+ rnode.spcNode, rnode.dbNode, rnode.relNode,
+ forknum, blkno);
+ }
+}
+
+/*
* Subroutine of XLogInsertRecord. Copies a WAL record to an already-reserved
* area in the WAL.
*/
@@ -6149,6 +6249,13 @@ StartupXLOG(void)
errdetail("Failed while allocating an XLog reading processor.")));
xlogreader->system_identifier = ControlFile->system_identifier;
+ /*
+ * Allocate pages dedicated to WAL consistency checks, those had better
+ * be aligned.
+ */
+ new_page_masked = (char *) palloc(BLCKSZ);
+ old_page_masked = (char *) palloc(BLCKSZ);
+
if (read_backup_label(&checkPointLoc, &backupEndRequired,
&backupFromStandby))
{
@@ -6949,6 +7056,15 @@ StartupXLOG(void)
/* Now apply the WAL record itself */
RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+ /*
+ * After redo, check whether the backup pages associated with
+ * the WAL record are consistent with the existing pages. This
+ * check is done only if consistency check is enabled for this
+ * record.
+ */
+ if ((record->xl_info & XLR_CHECK_CONSISTENCY) != 0)
+ checkConsistency(xlogreader);
+
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -7479,6 +7595,12 @@ StartupXLOG(void)
}
XLogReaderFree(xlogreader);
+ /* Clean up buffers dedicated to WAL consistency checks */
+ if (old_page_masked)
+ pfree(old_page_masked);
+ if (new_page_masked)
+ pfree(new_page_masked);
+
/*
* If any of the critical GUCs have changed, log them before we allow
* backends to write WAL.
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 3cd273b..c635844 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -414,10 +414,12 @@ XLogInsert(RmgrId rmid, uint8 info)
elog(ERROR, "XLogBeginInsert was not called");
/*
- * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
- * reserved for use by me.
+ * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
+ * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
*/
- if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
+ if ((info & ~(XLR_RMGR_INFO_MASK |
+ XLR_SPECIAL_REL_UPDATE |
+ XLR_CHECK_CONSISTENCY)) != 0)
elog(PANIC, "invalid xlog info mask %02X", info);
TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
@@ -498,6 +500,15 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
hdr_rdt.data = hdr_scratch;
/*
+ * Enforce consistency checks for this record if user is looking for
+ * it. Do this before at the beginning of this routine to give the
+ * possibility for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY
+ * directly for a record.
+ */
+ if (wal_consistency[rmid])
+ info |= XLR_CHECK_CONSISTENCY;
+
+ /*
* Make an rdata chain containing all the data portions of all block
* references. This includes the data for full-page images. Also append
* the headers for the block references in the scratch buffer.
@@ -513,6 +524,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecordBlockCompressHeader cbimg = {0};
bool samerel;
bool is_compressed = false;
+ bool include_image;
if (!regbuf->in_use)
continue;
@@ -556,7 +568,14 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
- if (needs_backup)
+ /*
+ * If needs_backup is true or wal consistency check is enabled for
+ * current resource manager, log a full-page write for the current
+ * block.
+ */
+ include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
+
+ if (include_image)
{
Page page = regbuf->page;
uint16 compressed_len;
@@ -618,6 +637,15 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+ /*
+ * If WAL consistency is enabled for the resource manager of
+ * this WAL record, a full-page image is included in the record
+ * for the block modified. During redo, the full-page is replayed
+ * only if BKPIMAGE_APPLY is set.
+ */
+ if (needs_backup)
+ bimg.bimg_info |= BKPIMAGE_APPLY;
+
if (is_compressed)
{
bimg.length = compressed_len;
@@ -680,7 +708,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
/* Ok, copy the header to the scratch buffer */
memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
scratch += SizeOfXLogRecordBlockHeader;
- if (needs_backup)
+ if (include_image)
{
memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
scratch += SizeOfXLogRecordBlockImageHeader;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 56d4c66..4be6373 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -997,6 +997,7 @@ ResetDecoder(XLogReaderState *state)
state->blocks[block_id].in_use = false;
state->blocks[block_id].has_image = false;
state->blocks[block_id].has_data = false;
+ state->blocks[block_id].apply_image = false;
}
state->max_block_id = -1;
}
@@ -1089,6 +1090,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
blk = &state->blocks[block_id];
blk->in_use = true;
+ blk->apply_image = false;
COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
@@ -1120,6 +1122,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
+
+ blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
+
if (blk->bimg_info & BKPIMAGE_IS_COMPRESSED)
{
if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
@@ -1243,6 +1248,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
if (!blk->in_use)
continue;
+
+ Assert(blk->has_image || !blk->apply_image);
+
if (blk->has_image)
{
blk->bkp_image = ptr;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 51a8e8d..651faf2 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -275,9 +275,9 @@ XLogCheckInvalidPages(void)
* will complain if we don't have the lock. In hot standby mode it's
* definitely necessary.)
*
- * Note: when a backup block is available in XLOG, we restore it
- * unconditionally, even if the page in the database appears newer. This is
- * to protect ourselves against database pages that were partially or
+ * Note: when a backup block is available in XLOG with BKPIMAGE_APPLY flag
+ * set, we restore it, even if the page in the database appears newer. This
+ * is to protect ourselves against database pages that were partially or
* incorrectly written during a crash. We assume that the XLOG data must be
* good because it has passed a CRC check, while the database page might not
* be. This will force us to replay all subsequent modifications of the page
@@ -310,9 +310,11 @@ XLogInitBufferForRedo(XLogReaderState *record, uint8 block_id)
* XLogReadBufferForRedoExtended
* Like XLogReadBufferForRedo, but with extra options.
*
- * In RBM_ZERO_* modes, if the page doesn't exist, the relation is extended
- * with all-zeroes pages up to the referenced block number. In
- * RBM_ZERO_AND_LOCK and RBM_ZERO_AND_CLEANUP_LOCK modes, the return value
+ * In RBM_ZERO_* modes, if the page doesn't exist or BKPIMAGE_APPLY flag
+ * is not set for the backup block, the relation is extended with all-zeroes
+ * pages up to the referenced block number.
+ *
+ * In RBM_ZERO_AND_LOCK and RBM_ZERO_AND_CLEANUP_LOCK modes, the return value
* is always BLK_NEEDS_REDO.
*
* (The RBM_ZERO_AND_CLEANUP_LOCK mode is redundant with the get_cleanup_lock
@@ -352,9 +354,10 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
if (!willinit && zeromode)
elog(PANIC, "block to be initialized in redo routine must be marked with WILL_INIT flag in the WAL record");
- /* If it's a full-page image, restore it. */
- if (XLogRecHasBlockImage(record, block_id))
+ /* If it has a full-page image and it should be restored, do it. */
+ if (XLogRecBlockImageApply(record, block_id))
{
+ Assert(XLogRecHasBlockImage(record, block_id));
*buf = XLogReadBufferExtended(rnode, forknum, blkno,
get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK);
page = BufferGetPage(*buf);
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index fc3a8ee..864d6a9 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -31,6 +31,7 @@
#include "funcapi.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
+#include "storage/bufmask.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "storage/smgr.h"
@@ -1646,3 +1647,14 @@ ResetSequenceCaches(void)
last_used_seq = NULL;
}
+
+/*
+ * Mask a Sequence page before performing consistency checks on it.
+ */
+void
+seq_mask(char *page, BlockNumber blkno)
+{
+ mask_page_lsn(page);
+
+ mask_unused_space(page);
+}
diff --git a/src/backend/storage/buffer/Makefile b/src/backend/storage/buffer/Makefile
index 2c10fba..8630dca 100644
--- a/src/backend/storage/buffer/Makefile
+++ b/src/backend/storage/buffer/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/storage/buffer
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = buf_table.o buf_init.o bufmgr.o freelist.o localbuf.o
+OBJS = buf_table.o buf_init.o bufmask.o bufmgr.o freelist.o localbuf.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/buffer/bufmask.c b/src/backend/storage/buffer/bufmask.c
new file mode 100644
index 0000000..0e062ac
--- /dev/null
+++ b/src/backend/storage/buffer/bufmask.c
@@ -0,0 +1,86 @@
+/*-------------------------------------------------------------------------
+ *
+ * bufmask.c
+ * Routines for buffer masking. Used to mask certain bits
+ * in a page which can be different when the WAL is generated
+ * and when the WAL is applied.
+ *
+ * Portions Copyright (c) 2016, PostgreSQL Global Development Group
+ *
+ * Contains common routines required for masking a page.
+ *
+ * IDENTIFICATION
+ * src/backend/storage/buffer/bufmask.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/bufmask.h"
+
+/*
+ * mask_page_lsn
+ *
+ * In consistency checks, the LSN of the two pages compared will likely be
+ * different because of concurrent operations when the WAL is generated
+ * and the state of the page when WAL is applied.
+ */
+void
+mask_page_lsn(Page page)
+{
+ PageHeader phdr = (PageHeader) page;
+
+ PageXLogRecPtrSet(phdr->pd_lsn, PG_UINT64_MAX);
+}
+
+/*
+ * mask_page_hint_bits
+ *
+ * Mask hint bits in PageHeader.
+ */
+void
+mask_page_hint_bits(Page page)
+{
+ PageHeader phdr = (PageHeader) page;
+
+ /* Ignore prune_xid (it's like a hint-bit) */
+ phdr->pd_prune_xid = PG_UINT32_MAX;
+
+ /* Ignore PD_PAGE_FULL and PD_HAS_FREE_LINES flags, they are just hints */
+ phdr->pd_flags |= PD_PAGE_FULL | PD_HAS_FREE_LINES;
+
+ /*
+ * Also mask the all-visible flag.
+ *
+ * XXX: It is unfortunate that we have to do this. If the flag is set
+ * incorrectly, that's serious, and we would like to catch it. If the flag
+ * is cleared incorrectly, that's serious too. But redo of HEAP_CLEAN
+ * records don't currently set the flag, even though it is set in the
+ * master, so we must silence failures that that causes.
+ */
+ phdr->pd_flags |= PD_ALL_VISIBLE;
+}
+
+/*
+ * mask_unused_space
+ *
+ * Mask the unused space of a page between pd_lower and pd_upper.
+ */
+void
+mask_unused_space(Page page)
+{
+ int pd_lower = ((PageHeader) page)->pd_lower;
+ int pd_upper = ((PageHeader) page)->pd_upper;
+ int pd_special = ((PageHeader) page)->pd_special;
+
+ /* Sanity check */
+ if (pd_lower > pd_upper || pd_special < pd_upper ||
+ pd_lower < SizeOfPageHeaderData || pd_special > BLCKSZ)
+ {
+ elog(ERROR, "invalid page pd_lower %u pd_upper %u pd_special %u\n",
+ pd_lower, pd_upper, pd_special);
+ }
+
+ memset(page + pd_lower, MASK_MARKER, pd_upper - pd_lower);
+}
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 3c695c1..915d24c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -28,9 +28,11 @@
#include "access/commit_ts.h"
#include "access/gin.h"
+#include "access/rmgr.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
+#include "access/xlog_internal.h"
#include "catalog/namespace.h"
#include "commands/async.h"
#include "commands/prepare.h"
@@ -145,6 +147,9 @@ static bool call_enum_check_hook(struct config_enum * conf, int *newval,
static bool check_log_destination(char **newval, void **extra, GucSource source);
static void assign_log_destination(const char *newval, void *extra);
+static bool check_wal_consistency(char **newval, void **extra, GucSource source);
+static void assign_wal_consistency(const char *newval, void *extra);
+
#ifdef HAVE_SYSLOG
static int syslog_facility = LOG_LOCAL0;
#else
@@ -3254,6 +3259,16 @@ static struct config_string ConfigureNamesString[] =
},
{
+ {"wal_consistency", PGC_SUSET, WAL_SETTINGS,
+ gettext_noop("Sets the WAL resource managers for which WAL consistency checks are done."),
+ NULL,
+ GUC_LIST_INPUT
+ },
+ &wal_consistency_string,
+ "",
+ check_wal_consistency, assign_wal_consistency, NULL
+ },
+ {
{"log_destination", PGC_SIGHUP, LOGGING_WHERE,
gettext_noop("Sets the destination for server log output."),
gettext_noop("Valid values are combinations of \"stderr\", "
@@ -9867,6 +9882,121 @@ call_enum_check_hook(struct config_enum * conf, int *newval, void **extra,
*/
static bool
+check_wal_consistency(char **newval, void **extra, GucSource source)
+{
+ char *rawstring;
+ List *elemlist;
+ ListCell *l;
+ bool newwalconsistency[RM_MAX_ID + 1];
+ bool isRmgrId = false; /* Does this guc include any
+ * individual resource manager? */
+ bool isAll = false; /* Does this guc include 'all' keyword? */
+
+ /* Initialize the array */
+ MemSet(newwalconsistency, 0, (RM_MAX_ID + 1) * sizeof(bool));
+
+ /* Need a modifiable copy of string */
+ rawstring = pstrdup(*newval);
+
+ /* Parse string into list of identifiers */
+ if (!SplitIdentifierString(rawstring, ',', &elemlist))
+ {
+ /* syntax error in list */
+ GUC_check_errdetail("List syntax is invalid.");
+ pfree(rawstring);
+ list_free(elemlist);
+ return false;
+ }
+
+ foreach(l, elemlist)
+ {
+ char *tok = (char *) lfirst(l);
+ bool found = false;
+ int i;
+
+ /* Check if the token matches with any individual resource manager */
+ for (i = 0; i <= RM_MAX_ID; i++)
+ {
+ if (pg_strcasecmp(tok, RmgrTable[i].rm_name) == 0)
+ {
+ /*
+ * Found a match. Now, check if mask function
+ * is defined for this resource manager. We'll enable this feature
+ * only for the resource managers for which a masking function
+ * is defined.
+ */
+ if (RmgrTable[i].rm_mask != NULL)
+ {
+ newwalconsistency[i] = true;
+ found = true;
+ isRmgrId = true;
+ break;
+ }
+ else
+ {
+ GUC_check_errdetail("Unrecognized key word: \"%s\".", tok);
+ pfree(rawstring);
+ list_free(elemlist);
+ return false;
+ }
+ }
+ }
+
+ /* If a valid resource manager is found, check for the next one. */
+ if (found)
+ continue;
+
+ /* Definitely not an individual resource manager. Check for 'all'. */
+ if (pg_strcasecmp(tok, "all") == 0)
+ {
+ /*
+ * This feature is enabled only for the resource managers where
+ * a masking function is defined.
+ */
+ for (i = 0; i <= RM_MAX_ID; i++)
+ {
+ if (RmgrTable[i].rm_mask != NULL)
+ {
+ newwalconsistency[i] = true;
+ }
+ }
+ isAll = true;
+ }
+ else
+ {
+ GUC_check_errdetail("Unrecognized key word: \"%s\".", tok);
+ pfree(rawstring);
+ list_free(elemlist);
+ return false;
+ }
+ }
+
+ pfree(rawstring);
+ list_free(elemlist);
+
+ /*
+ * Parameter should contain either 'all' or a combination of resource
+ * managers.
+ */
+ if (isAll && isRmgrId)
+ {
+ GUC_check_errdetail("Invalid value combination");
+ return false;
+ }
+
+ /* assign new value */
+ *extra = guc_malloc(ERROR, (RM_MAX_ID + 1) * sizeof(bool));
+ memcpy(*extra, newwalconsistency, (RM_MAX_ID + 1) * sizeof(bool));
+ return true;
+}
+
+static void
+assign_wal_consistency(const char *newval, void *extra)
+{
+ wal_consistency = (bool *) extra;
+}
+
+static bool
check_log_destination(char **newval, void **extra, GucSource source)
{
char *rawstring;
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 7c2daa5..ca734fe 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -191,6 +191,10 @@
# open_sync
#full_page_writes = on # recover from partial page writes
#wal_compression = off # enable compression of full-page writes
+#wal_consistency = '' # Valid values are combinations of
+ # heap2, heap, btree, gin, gist,
+ # sequence, spgist and brin. It can also
+ # be set to 'all' to enable all the values
#wal_log_hints = off # also do full page writes of non-critical updates
# (change requires restart)
#wal_buffers = -1 # min 32kB, -1 sets based on shared_buffers
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 23ac4e7..a170d01 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -29,7 +29,7 @@
* RmgrNames is an array of resource manager names, to make error messages
* a bit nicer.
*/
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
name,
static const char *RmgrNames[RM_MAX_ID + 1] = {
diff --git a/src/bin/pg_xlogdump/pg_xlogdump.c b/src/bin/pg_xlogdump/pg_xlogdump.c
index d070312..48a3d48 100644
--- a/src/bin/pg_xlogdump/pg_xlogdump.c
+++ b/src/bin/pg_xlogdump/pg_xlogdump.c
@@ -465,7 +465,12 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
rnode.spcNode, rnode.dbNode, rnode.relNode,
blk);
if (XLogRecHasBlockImage(record, block_id))
- printf(" FPW");
+ {
+ if (XLogRecBlockImageApply(record, block_id))
+ printf(" FPW (apply)");
+ else
+ printf(" FPW");
+ }
}
putchar('\n');
}
@@ -489,7 +494,10 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
if (record->blocks[block_id].bimg_info &
BKPIMAGE_IS_COMPRESSED)
{
- printf(" (FPW); hole: offset: %u, length: %u, compression saved: %u\n",
+ printf(" (FPW)%s; hole: offset: %u, length: %u, "
+ "compression saved: %u\n",
+ XLogRecBlockImageApply(record, block_id) ?
+ " apply" : "",
record->blocks[block_id].hole_offset,
record->blocks[block_id].hole_length,
BLCKSZ -
@@ -498,7 +506,9 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
}
else
{
- printf(" (FPW); hole: offset: %u, length: %u\n",
+ printf(" (FPW)%s; hole: offset: %u, length: %u\n",
+ XLogRecBlockImageApply(record, block_id) ?
+ " apply" : "",
record->blocks[block_id].hole_offset,
record->blocks[block_id].hole_length);
}
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index 8fe20ce..5d19a4a 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -32,7 +32,7 @@
#include "storage/standbydefs.h"
#include "utils/relmapper.h"
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
{ name, desc, identify},
const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = {
diff --git a/src/include/access/brin_xlog.h b/src/include/access/brin_xlog.h
index f614805..68192a7 100644
--- a/src/include/access/brin_xlog.h
+++ b/src/include/access/brin_xlog.h
@@ -128,5 +128,6 @@ typedef struct xl_brin_revmap_extend
extern void brin_redo(XLogReaderState *record);
extern void brin_desc(StringInfo buf, XLogReaderState *record);
extern const char *brin_identify(uint8 info);
+extern void brin_mask(char *page, BlockNumber blkno);
#endif /* BRIN_XLOG_H */
diff --git a/src/include/access/gin.h b/src/include/access/gin.h
index e5b2e10..8ec0eeb 100644
--- a/src/include/access/gin.h
+++ b/src/include/access/gin.h
@@ -79,5 +79,6 @@ extern void gin_desc(StringInfo buf, XLogReaderState *record);
extern const char *gin_identify(uint8 info);
extern void gin_xlog_startup(void);
extern void gin_xlog_cleanup(void);
+extern void gin_mask(char *page, BlockNumber blkno);
#endif /* GIN_H */
diff --git a/src/include/access/gist_private.h b/src/include/access/gist_private.h
index 78e87a6..3f8e7b7 100644
--- a/src/include/access/gist_private.h
+++ b/src/include/access/gist_private.h
@@ -460,6 +460,7 @@ extern void gist_desc(StringInfo buf, XLogReaderState *record);
extern const char *gist_identify(uint8 info);
extern void gist_xlog_startup(void);
extern void gist_xlog_cleanup(void);
+extern void gist_mask(char *page, BlockNumber blkno);
extern XLogRecPtr gistXLogUpdate(Buffer buffer,
OffsetNumber *todelete, int ntodelete,
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 06a8242..5cd3022 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -373,6 +373,7 @@ extern void HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
extern void heap_redo(XLogReaderState *record);
extern void heap_desc(StringInfo buf, XLogReaderState *record);
extern const char *heap_identify(uint8 info);
+extern void heap_mask(char *page, BlockNumber blkno);
extern void heap2_redo(XLogReaderState *record);
extern void heap2_desc(StringInfo buf, XLogReaderState *record);
extern const char *heap2_identify(uint8 info);
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index c580f51..006922a 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -775,5 +775,6 @@ extern void _bt_leafbuild(BTSpool *btspool, BTSpool *spool2);
extern void btree_redo(XLogReaderState *record);
extern void btree_desc(StringInfo buf, XLogReaderState *record);
extern const char *btree_identify(uint8 info);
+extern void btree_mask(char *page, BlockNumber blkno);
#endif /* NBTREE_H */
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index ff7fe62..64b92ff 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -19,7 +19,7 @@ typedef uint8 RmgrId;
* Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG
* file format.
*/
-#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup) \
+#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \
symname,
typedef enum RmgrIds
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index a7a0ae2..89182e2 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -25,25 +25,25 @@
*/
/* symbol name, textual name, redo, desc, identify, startup, cleanup */
-PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL)
-PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL)
-PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL)
-PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL)
-PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL)
-PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL)
-PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL)
-PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL)
-PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL)
-PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL)
-PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL)
-PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL)
-PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL)
-PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup)
-PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL)
-PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup)
-PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
-PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
-PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
-PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL)
-PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL)
+PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL)
+PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL)
+PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL)
+PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL)
+PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL)
+PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL)
+PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL)
+PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL)
+PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL)
+PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask)
+PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask)
+PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask)
+PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, NULL)
+PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask)
+PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask)
+PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask)
+PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL)
+PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL)
+PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL)
diff --git a/src/include/access/spgist.h b/src/include/access/spgist.h
index a953a5a..fd6b9f5 100644
--- a/src/include/access/spgist.h
+++ b/src/include/access/spgist.h
@@ -220,5 +220,6 @@ extern void spg_desc(StringInfo buf, XLogReaderState *record);
extern const char *spg_identify(uint8 info);
extern void spg_xlog_startup(void);
extern void spg_xlog_cleanup(void);
+extern void spg_mask(char *page, BlockNumber blkno);
#endif /* SPGIST_H */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index c9f332c..295bf09 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -105,6 +105,8 @@ extern bool EnableHotStandby;
extern bool fullPageWrites;
extern bool wal_log_hints;
extern bool wal_compression;
+extern bool *wal_consistency;
+extern char *wal_consistency_string;
extern bool log_checkpoints;
extern int CheckPointSegments;
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index ceb0462..57756b8 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -266,6 +266,10 @@ typedef enum
* "VACUUM". rm_desc can then be called to obtain additional detail for the
* record, if available (e.g. the last block).
*
+ * rm_mask uses in input a page associated to the resource manager's records
+ * and performs masking actions on it for consistency check comparisons.
+ * The input must be an already allocated page of size BLCKSZ.
+ *
* RmgrTable[] is indexed by RmgrId values (see rmgrlist.h).
*/
typedef struct RmgrData
@@ -276,6 +280,7 @@ typedef struct RmgrData
const char *(*rm_identify) (uint8 info);
void (*rm_startup) (void);
void (*rm_cleanup) (void);
+ void (*rm_mask) (char *page, BlockNumber blkno);
} RmgrData;
extern const RmgrData RmgrTable[];
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index deaa7f5..697a4ef 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -52,6 +52,7 @@ typedef struct
/* Information on full-page image, if any */
bool has_image;
+ bool apply_image; /* Restore image during WAL replay */
char *bkp_image;
uint16 hole_offset;
uint16 hole_length;
@@ -205,6 +206,8 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
((decoder)->blocks[block_id].in_use)
#define XLogRecHasBlockImage(decoder, block_id) \
((decoder)->blocks[block_id].has_image)
+#define XLogRecBlockImageApply(decoder, block_id) \
+ ((decoder)->blocks[block_id].apply_image)
extern bool RestoreBlockImage(XLogReaderState *recoder, uint8 block_id, char *dst);
extern char *XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len);
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index 3dfcb49..972d99d 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -56,8 +56,8 @@ typedef struct XLogRecord
/*
* The high 4 bits in xl_info may be used freely by rmgr. The
- * XLR_SPECIAL_REL_UPDATE bit can be passed by XLogInsert caller. The rest
- * are set internally by XLogInsert.
+ * XLR_SPECIAL_REL_UPDATE and XLR_CHECK_CONSISTENCY bits can be passed by
+ * XLogInsert caller. The rest are set internally by XLogInsert.
*/
#define XLR_INFO_MASK 0x0F
#define XLR_RMGR_INFO_MASK 0xF0
@@ -71,6 +71,15 @@ typedef struct XLogRecord
#define XLR_SPECIAL_REL_UPDATE 0x01
/*
+ * Enforces consistency checks of replayed WAL at recovery. If enabled,
+ * each record will log a full-page write for each block modified by the
+ * record and will reuse it afterwards for consistency checks. The caller
+ * of XLogInsert can use this value if necessary, note that if wal_consistency
+ * is enabled this is set unconditionally.
+ */
+#define XLR_CHECK_CONSISTENCY 0x02
+
+/*
* Header info for block data appended to an XLOG record.
*
* 'data_length' is the length of the rmgr-specific payload data associated
@@ -137,6 +146,8 @@ typedef struct XLogRecordBlockImageHeader
/* Information stored in bimg_info */
#define BKPIMAGE_HAS_HOLE 0x01 /* page image has "hole" */
#define BKPIMAGE_IS_COMPRESSED 0x02 /* page image is compressed */
+#define BKPIMAGE_APPLY 0x04 /* page image should be restored
+ * during replay */
/*
* Extra header information used when page image has "hole" and
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 392a626..6fd4130 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -82,5 +82,6 @@ extern void ResetSequenceCaches(void);
extern void seq_redo(XLogReaderState *rptr);
extern void seq_desc(StringInfo buf, XLogReaderState *rptr);
extern const char *seq_identify(uint8 info);
+extern void seq_mask(char *page, BlockNumber blkno);
#endif /* SEQUENCE_H */
diff --git a/src/include/storage/bufmask.h b/src/include/storage/bufmask.h
new file mode 100644
index 0000000..ab1a93c
--- /dev/null
+++ b/src/include/storage/bufmask.h
@@ -0,0 +1,27 @@
+/*-------------------------------------------------------------------------
+ *
+ * bufmask.h
+ * Definitions for buffer masking routines, used to mask certain bits
+ * in a page which can be different when the WAL is generated
+ * and when the WAL is applied. So, we mask those bits before any
+ * page comparison to make them consistent.
+ *
+ * Portions Copyright (c) 2016, PostgreSQL Global Development Group
+ *
+ * src/include/storage/bufmask.h
+ */
+
+#ifndef BUFMASK_H
+#define BUFMASK_H
+
+#include "postgres.h"
+#include "storage/block.h"
+#include "storage/bufmgr.h"
+
+/* Marker used to mask pages consistently */
+#define MASK_MARKER 0xFF
+
+extern void mask_page_lsn(Page page);
+extern void mask_page_hint_bits(Page page);
+extern void mask_unused_space(Page page);
+#endif