commit 1a770baa3a3f293e8c592f0419d279b7b8bf7b66
Author: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date:   Wed Aug 13 15:39:08 2014 +0300

    Refactor heapam.c redo routines to use XLogReplayBuffer

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index d731f98..bf863af 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -7137,15 +7137,13 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 {
 	xl_heap_clean *xlrec = (xl_heap_clean *) XLogRecGetData(record);
 	Buffer		buffer;
-	Page		page;
-	OffsetNumber *end;
-	OffsetNumber *redirected;
-	OffsetNumber *nowdead;
-	OffsetNumber *nowunused;
-	int			nredirected;
-	int			ndead;
-	int			nunused;
-	Size		freespace;
+	Size		freespace = 0;
+	RelFileNode	rnode;
+	BlockNumber	blkno;
+	XLogReplayResult rc;
+
+	rnode = xlrec->node;
+	blkno = xlrec->block;
 
 	/*
 	 * We're about to remove tuples. In Hot Standby mode, ensure that there's
@@ -7156,65 +7154,62 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record)
 	 * latestRemovedXid is invalid, skip conflict processing.
 	 */
 	if (InHotStandby && TransactionIdIsValid(xlrec->latestRemovedXid))
-		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid,
-											xlrec->node);
+		ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, rnode);
 
 	/*
 	 * If we have a full-page image, restore it (using a cleanup lock) and
 	 * we're done.
 	 */
-	if (record->xl_info & XLR_BKP_BLOCK(0))
-	{
-		(void) RestoreBackupBlock(lsn, record, 0, true, false);
-		return;
-	}
+	rc = XLogReplayBufferExtended(0, rnode, MAIN_FORKNUM, blkno,
+								  RBM_NORMAL, true, &buffer);
+	if (rc == BLK_NEEDS_REDO)
+	{
+		Page		page = (Page) BufferGetPage(buffer);
+		OffsetNumber *end;
+		OffsetNumber *redirected;
+		OffsetNumber *nowdead;
+		OffsetNumber *nowunused;
+		int			nredirected;
+		int			ndead;
+		int			nunused;
+
+		nredirected = xlrec->nredirected;
+		ndead = xlrec->ndead;
+		end = (OffsetNumber *) ((char *) xlrec + record->xl_len);
+		redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
+		nowdead = redirected + (nredirected * 2);
+		nowunused = nowdead + ndead;
+		nunused = (end - nowunused);
+		Assert(nunused >= 0);
+
+		/* Update all item pointers per the record, and repair fragmentation */
+		heap_page_prune_execute(buffer,
+								redirected, nredirected,
+								nowdead, ndead,
+								nowunused, nunused);
+
+		freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
 
-	buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL);
-	if (!BufferIsValid(buffer))
-		return;
-	LockBufferForCleanup(buffer);
-	page = (Page) BufferGetPage(buffer);
+		/*
+		 * Note: we don't worry about updating the page's prunability hints. At
+		 * worst this will cause an extra prune cycle to occur soon.
+		 */
 
-	if (lsn <= PageGetLSN(page))
-	{
-		UnlockReleaseBuffer(buffer);
-		return;
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
 	}
-
-	nredirected = xlrec->nredirected;
-	ndead = xlrec->ndead;
-	end = (OffsetNumber *) ((char *) xlrec + record->xl_len);
-	redirected = (OffsetNumber *) ((char *) xlrec + SizeOfHeapClean);
-	nowdead = redirected + (nredirected * 2);
-	nowunused = nowdead + ndead;
-	nunused = (end - nowunused);
-	Assert(nunused >= 0);
-
-	/* Update all item pointers per the record, and repair fragmentation */
-	heap_page_prune_execute(buffer,
-							redirected, nredirected,
-							nowdead, ndead,
-							nowunused, nunused);
-
-	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
-
-	/*
-	 * Note: we don't worry about updating the page's prunability hints. At
-	 * worst this will cause an extra prune cycle to occur soon.
-	 */
-
-	PageSetLSN(page, lsn);
-	MarkBufferDirty(buffer);
-	UnlockReleaseBuffer(buffer);
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
 
 	/*
 	 * Update the FSM as well.
 	 *
-	 * XXX: We don't get here if the page was restored from full page image.
+	 * XXX: Don't do this if the page was restored from full page image.
 	 * We don't bother to update the FSM in that case, it doesn't need to be
 	 * totally accurate anyway.
 	 */
-	XLogRecordPageWithFreeSpace(xlrec->node, xlrec->block, freespace);
+	if (rc == BLK_NEEDS_REDO)
+		XLogRecordPageWithFreeSpace(xlrec->node, xlrec->block, freespace);
 }
 
 /*
@@ -7229,6 +7224,15 @@ static void
 heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
 {
 	xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record);
+	Buffer		buffer;
+	Page		page;
+	Buffer		vmbuffer = InvalidBuffer;
+	RelFileNode	rnode;
+	BlockNumber	blkno;
+	XLogReplayResult rc;
+
+	rnode = xlrec->node;
+	blkno = xlrec->block;
 
 	/*
 	 * If there are any Hot Standby transactions running that have an xmin
@@ -7240,60 +7244,45 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
 	 * rather than killing the transaction outright.
 	 */
 	if (InHotStandby)
-		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, xlrec->node);
+		ResolveRecoveryConflictWithSnapshot(xlrec->cutoff_xid, rnode);
 
 	/*
-	 * If heap block was backed up, restore it. This can only happen with
-	 * checksums enabled.
+	 * Read the heap page, if it still exists. If the heap file has
+	 * dropped or truncated later in recovery, we don't need to update the
+	 * page, but we'd better still update the visibility map.
 	 */
-	if (record->xl_info & XLR_BKP_BLOCK(1))
+	rc = XLogReplayBuffer(1, rnode, blkno, &buffer);
+	if (rc == BLK_NEEDS_REDO)
 	{
-		Assert(DataChecksumsEnabled());
-		(void) RestoreBackupBlock(lsn, record, 1, false, false);
+		/*
+		 * We don't bump the LSN of the heap page when setting the
+		 * visibility map bit (unless checksums are enabled, in which case
+		 * we must), because that would generate an unworkable volume of
+		 * full-page writes.  This exposes us to torn page hazards, but
+		 * since we're not inspecting the existing page contents in any
+		 * way, we don't care.
+		 *
+		 * However, all operations that clear the visibility map bit *do*
+		 * bump the LSN, and those operations will only be replayed if the
+		 * XLOG LSN follows the page LSN.  Thus, if the page LSN has
+		 * advanced past our XLOG record's LSN, we mustn't mark the page
+		 * all-visible, because the subsequent update won't be replayed to
+		 * clear the flag.
+		 */
+		page = BufferGetPage(buffer);
+		PageSetAllVisible(page);
+		MarkBufferDirty(buffer);
 	}
-	else
+	else if (rc == BLK_RESTORED)
 	{
-		Buffer		buffer;
-		Page		page;
-
 		/*
-		 * Read the heap page, if it still exists. If the heap file has been
-		 * dropped or truncated later in recovery, we don't need to update the
-		 * page, but we'd better still update the visibility map.
+		 * If heap block was backed up, restore it. This can only happen with
+		 * checksums enabled.
 		 */
-		buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM,
-										xlrec->block, RBM_NORMAL);
-		if (BufferIsValid(buffer))
-		{
-			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
-			page = (Page) BufferGetPage(buffer);
-
-			/*
-			 * We don't bump the LSN of the heap page when setting the
-			 * visibility map bit (unless checksums are enabled, in which case
-			 * we must), because that would generate an unworkable volume of
-			 * full-page writes.  This exposes us to torn page hazards, but
-			 * since we're not inspecting the existing page contents in any
-			 * way, we don't care.
-			 *
-			 * However, all operations that clear the visibility map bit *do*
-			 * bump the LSN, and those operations will only be replayed if the
-			 * XLOG LSN follows the page LSN.  Thus, if the page LSN has
-			 * advanced past our XLOG record's LSN, we mustn't mark the page
-			 * all-visible, because the subsequent update won't be replayed to
-			 * clear the flag.
-			 */
-			if (lsn > PageGetLSN(page))
-			{
-				PageSetAllVisible(page);
-				MarkBufferDirty(buffer);
-			}
-
-			/* Done with heap page. */
-			UnlockReleaseBuffer(buffer);
-		}
+		Assert(DataChecksumsEnabled());
 	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
 
 	/*
 	 * Even if we skipped the heap page update due to the LSN interlock, it's
@@ -7306,10 +7295,9 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
 	else
 	{
 		Relation	reln;
-		Buffer		vmbuffer = InvalidBuffer;
 
-		reln = CreateFakeRelcacheEntry(xlrec->node);
-		visibilitymap_pin(reln, xlrec->block, &vmbuffer);
+		reln = CreateFakeRelcacheEntry(rnode);
+		visibilitymap_pin(reln, blkno, &vmbuffer);
 
 		/*
 		 * Don't set the bit if replay has already passed this point.
@@ -7323,7 +7311,7 @@ heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record)
 		 * real harm is done; and the next VACUUM will fix it.
 		 */
 		if (lsn > PageGetLSN(BufferGetPage(vmbuffer)))
-			visibilitymap_set(reln, xlrec->block, InvalidBuffer, lsn, vmbuffer,
+			visibilitymap_set(reln, blkno, InvalidBuffer, lsn, vmbuffer,
 							  xlrec->cutoff_xid);
 
 		ReleaseBuffer(vmbuffer);
@@ -7351,41 +7339,29 @@ heap_xlog_freeze_page(XLogRecPtr lsn, XLogRecord *record)
 		ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node);
 
 	/* If we have a full-page image, restore it and we're done */
-	if (record->xl_info & XLR_BKP_BLOCK(0))
+	if (XLogReplayBuffer(0, xlrec->node, xlrec->block, &buffer) == BLK_NEEDS_REDO)
 	{
-		(void) RestoreBackupBlock(lsn, record, 0, false, false);
-		return;
-	}
-
-	buffer = XLogReadBuffer(xlrec->node, xlrec->block, false);
-	if (!BufferIsValid(buffer))
-		return;
-
-	page = (Page) BufferGetPage(buffer);
+		page = BufferGetPage(buffer);
 
-	if (lsn <= PageGetLSN(page))
-	{
-		UnlockReleaseBuffer(buffer);
-		return;
-	}
+		/* now execute freeze plan for each frozen tuple */
+		for (ntup = 0; ntup < xlrec->ntuples; ntup++)
+		{
+			xl_heap_freeze_tuple *xlrec_tp;
+			ItemId		lp;
+			HeapTupleHeader tuple;
 
-	/* now execute freeze plan for each frozen tuple */
-	for (ntup = 0; ntup < xlrec->ntuples; ntup++)
-	{
-		xl_heap_freeze_tuple *xlrec_tp;
-		ItemId		lp;
-		HeapTupleHeader tuple;
+			xlrec_tp = &xlrec->tuples[ntup];
+			lp = PageGetItemId(page, xlrec_tp->offset);		/* offsets are one-based */
+			tuple = (HeapTupleHeader) PageGetItem(page, lp);
 
-		xlrec_tp = &xlrec->tuples[ntup];
-		lp = PageGetItemId(page, xlrec_tp->offset);		/* offsets are one-based */
-		tuple = (HeapTupleHeader) PageGetItem(page, lp);
+			heap_execute_freeze_tuple(tuple, xlrec_tp);
+		}
 
-		heap_execute_freeze_tuple(tuple, xlrec_tp);
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
 	}
-
-	PageSetLSN(page, lsn);
-	MarkBufferDirty(buffer);
-	UnlockReleaseBuffer(buffer);
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
 }
 
 /*
@@ -7425,8 +7401,10 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
 	ItemId		lp = NULL;
 	HeapTupleHeader htup;
 	BlockNumber blkno;
+	RelFileNode	target_node;
 
 	blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
+	target_node = xlrec->target.node;
 
 	/*
 	 * The visibility map may need to be fixed even if the heap page is
@@ -7434,7 +7412,7 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
 	 */
 	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 	{
-		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
+		Relation	reln = CreateFakeRelcacheEntry(target_node);
 		Buffer		vmbuffer = InvalidBuffer;
 
 		visibilitymap_pin(reln, blkno, &vmbuffer);
@@ -7444,51 +7422,40 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record)
 	}
 
 	/* If we have a full-page image, restore it and we're done */
-	if (record->xl_info & XLR_BKP_BLOCK(0))
+	if (XLogReplayBuffer(0, target_node, blkno, &buffer) == BLK_NEEDS_REDO)
 	{
-		(void) RestoreBackupBlock(lsn, record, 0, false, false);
-		return;
-	}
-
-	buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
-	if (!BufferIsValid(buffer))
-		return;
-	page = (Page) BufferGetPage(buffer);
+		page = (Page) BufferGetPage(buffer);
 
-	if (lsn <= PageGetLSN(page))	/* changes are applied */
-	{
-		UnlockReleaseBuffer(buffer);
-		return;
-	}
+		offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
 
-	offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
-	if (PageGetMaxOffsetNumber(page) >= offnum)
-		lp = PageGetItemId(page, offnum);
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "heap_delete_redo: invalid lp");
 
-	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-		elog(PANIC, "heap_delete_redo: invalid lp");
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
 
-	htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-	htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-	htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-	HeapTupleHeaderClearHotUpdated(htup);
-	fix_infomask_from_infobits(xlrec->infobits_set,
-							   &htup->t_infomask, &htup->t_infomask2);
-	HeapTupleHeaderSetXmax(htup, xlrec->xmax);
-	HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		HeapTupleHeaderClearHotUpdated(htup);
+		fix_infomask_from_infobits(xlrec->infobits_set,
+								   &htup->t_infomask, &htup->t_infomask2);
+		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
 
-	/* Mark the page as a candidate for pruning */
-	PageSetPrunable(page, record->xl_xid);
+		/* Mark the page as a candidate for pruning */
+		PageSetPrunable(page, record->xl_xid);
 
-	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
-		PageClearAllVisible(page);
+		if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
 
-	/* Make sure there is no forward chain link in t_ctid */
-	htup->t_ctid = xlrec->target.tid;
-	PageSetLSN(page, lsn);
-	MarkBufferDirty(buffer);
-	UnlockReleaseBuffer(buffer);
+		/* Make sure there is no forward chain link in t_ctid */
+		htup->t_ctid = xlrec->target.tid;
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
 }
 
 static void
@@ -7506,9 +7473,12 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
 	HeapTupleHeader htup;
 	xl_heap_header xlhdr;
 	uint32		newlen;
-	Size		freespace;
+	Size		freespace = 0;
+	RelFileNode	target_node;
 	BlockNumber blkno;
+	XLogReplayResult rc;
 
+	target_node = xlrec->target.node;
 	blkno = ItemPointerGetBlockNumber(&(xlrec->target.tid));
 
 	/*
@@ -7517,7 +7487,7 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
 	 */
 	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 	{
-		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
+		Relation	reln = CreateFakeRelcacheEntry(target_node);
 		Buffer		vmbuffer = InvalidBuffer;
 
 		visibilitymap_pin(reln, blkno, &vmbuffer);
@@ -7527,81 +7497,70 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
 	}
 
 	/* If we have a full-page image, restore it and we're done */
-	if (record->xl_info & XLR_BKP_BLOCK(0))
-	{
-		(void) RestoreBackupBlock(lsn, record, 0, false, false);
-		return;
-	}
-
 	if (record->xl_info & XLOG_HEAP_INIT_PAGE)
 	{
-		buffer = XLogReadBuffer(xlrec->target.node, blkno, true);
-		Assert(BufferIsValid(buffer));
-		page = (Page) BufferGetPage(buffer);
+		rc = XLogReplayBufferExtended(0, target_node, MAIN_FORKNUM, blkno, RBM_ZERO, false, &buffer);
+		Assert(rc == BLK_NEEDS_REDO);
+		page = BufferGetPage(buffer);
 
 		PageInit(page, BufferGetPageSize(buffer), 0);
 	}
 	else
+		rc = XLogReplayBuffer(0, target_node, blkno, &buffer);
+
+	if (rc == BLK_NEEDS_REDO)
 	{
-		buffer = XLogReadBuffer(xlrec->target.node, blkno, false);
-		if (!BufferIsValid(buffer))
-			return;
-		page = (Page) BufferGetPage(buffer);
+		page = BufferGetPage(buffer);
 
-		if (lsn <= PageGetLSN(page))	/* changes are applied */
-		{
-			UnlockReleaseBuffer(buffer);
-			return;
-		}
-	}
+		offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+			elog(PANIC, "heap_insert_redo: invalid max offset number");
 
-	offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
-	if (PageGetMaxOffsetNumber(page) + 1 < offnum)
-		elog(PANIC, "heap_insert_redo: invalid max offset number");
-
-	newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader;
-	Assert(newlen <= MaxHeapTupleSize);
-	memcpy((char *) &xlhdr,
-		   (char *) xlrec + SizeOfHeapInsert,
-		   SizeOfHeapHeader);
-	htup = &tbuf.hdr;
-	MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
-	/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-	memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
-		   (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader,
-		   newlen);
-	newlen += offsetof(HeapTupleHeaderData, t_bits);
-	htup->t_infomask2 = xlhdr.t_infomask2;
-	htup->t_infomask = xlhdr.t_infomask;
-	htup->t_hoff = xlhdr.t_hoff;
-	HeapTupleHeaderSetXmin(htup, record->xl_xid);
-	HeapTupleHeaderSetCmin(htup, FirstCommandId);
-	htup->t_ctid = xlrec->target.tid;
+		newlen = record->xl_len - SizeOfHeapInsert - SizeOfHeapHeader;
+		Assert(newlen <= MaxHeapTupleSize);
+		memcpy((char *) &xlhdr,
+			   (char *) xlrec + SizeOfHeapInsert,
+			   SizeOfHeapHeader);
+		htup = &tbuf.hdr;
+		MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
+		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+		memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
+			   (char *) xlrec + SizeOfHeapInsert + SizeOfHeapHeader,
+			   newlen);
+		newlen += offsetof(HeapTupleHeaderData, t_bits);
+		htup->t_infomask2 = xlhdr.t_infomask2;
+		htup->t_infomask = xlhdr.t_infomask;
+		htup->t_hoff = xlhdr.t_hoff;
+		HeapTupleHeaderSetXmin(htup, record->xl_xid);
+		HeapTupleHeaderSetCmin(htup, FirstCommandId);
+		htup->t_ctid = xlrec->target.tid;
 
-	offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
-	if (offnum == InvalidOffsetNumber)
-		elog(PANIC, "heap_insert_redo: failed to add tuple");
+		offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+		if (offnum == InvalidOffsetNumber)
+			elog(PANIC, "heap_insert_redo: failed to add tuple");
 
-	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
+		freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
 
-	PageSetLSN(page, lsn);
+		PageSetLSN(page, lsn);
 
-	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
-		PageClearAllVisible(page);
+		if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
 
-	MarkBufferDirty(buffer);
-	UnlockReleaseBuffer(buffer);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
 
 	/*
 	 * If the page is running low on free space, update the FSM as well.
 	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
 	 * better than that without knowing the fill-factor for the table.
 	 *
-	 * XXX: We don't get here if the page was restored from full page image.
+	 * XXX: Don't do this if the page was restored from full page image.
 	 * We don't bother to update the FSM in that case, it doesn't need to be
 	 * totally accurate anyway.
 	 */
-	if (freespace < BLCKSZ / 5)
+	if (rc == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
 		XLogRecordPageWithFreeSpace(xlrec->target.node, blkno, freespace);
 }
 
@@ -7613,6 +7572,8 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
 {
 	char	   *recdata = XLogRecGetData(record);
 	xl_heap_multi_insert *xlrec;
+	RelFileNode rnode;
+	BlockNumber blkno;
 	Buffer		buffer;
 	Page		page;
 	struct
@@ -7622,10 +7583,10 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
 	}			tbuf;
 	HeapTupleHeader htup;
 	uint32		newlen;
-	Size		freespace;
-	BlockNumber blkno;
+	Size		freespace = 0;
 	int			i;
 	bool		isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+	XLogReplayResult rc;
 
 	/*
 	 * Insertion doesn't overwrite MVCC data, so no conflict processing is
@@ -7635,6 +7596,9 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
 	xlrec = (xl_heap_multi_insert *) recdata;
 	recdata += SizeOfHeapMultiInsert;
 
+	rnode = xlrec->node;
+	blkno = xlrec->blkno;
+
 	/*
 	 * If we're reinitializing the page, the tuples are stored in order from
 	 * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL
@@ -7643,15 +7607,13 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
 	if (!isinit)
 		recdata += sizeof(OffsetNumber) * xlrec->ntuples;
 
-	blkno = xlrec->blkno;
-
 	/*
 	 * The visibility map may need to be fixed even if the heap page is
 	 * already up-to-date.
 	 */
 	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 	{
-		Relation	reln = CreateFakeRelcacheEntry(xlrec->node);
+		Relation	reln = CreateFakeRelcacheEntry(rnode);
 		Buffer		vmbuffer = InvalidBuffer;
 
 		visibilitymap_pin(reln, blkno, &vmbuffer);
@@ -7660,94 +7622,82 @@ heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
 		FreeFakeRelcacheEntry(reln);
 	}
 
-	/* If we have a full-page image, restore it and we're done */
-	if (record->xl_info & XLR_BKP_BLOCK(0))
-	{
-		(void) RestoreBackupBlock(lsn, record, 0, false, false);
-		return;
-	}
-
 	if (isinit)
 	{
-		buffer = XLogReadBuffer(xlrec->node, blkno, true);
-		Assert(BufferIsValid(buffer));
-		page = (Page) BufferGetPage(buffer);
+		rc = XLogReplayBufferExtended(0, rnode, MAIN_FORKNUM, blkno,
+									  RBM_ZERO, false, &buffer);
+		Assert(rc == BLK_NEEDS_REDO);
+		page = BufferGetPage(buffer);
 
 		PageInit(page, BufferGetPageSize(buffer), 0);
 	}
 	else
-	{
-		buffer = XLogReadBuffer(xlrec->node, blkno, false);
-		if (!BufferIsValid(buffer))
-			return;
-		page = (Page) BufferGetPage(buffer);
+		rc = XLogReplayBuffer(0, rnode, blkno, &buffer);
 
-		if (lsn <= PageGetLSN(page))	/* changes are applied */
-		{
-			UnlockReleaseBuffer(buffer);
-			return;
-		}
-	}
-
-	for (i = 0; i < xlrec->ntuples; i++)
+	if (rc == BLK_NEEDS_REDO)
 	{
-		OffsetNumber offnum;
-		xl_multi_insert_tuple *xlhdr;
+		page = BufferGetPage(buffer);
+		for (i = 0; i < xlrec->ntuples; i++)
+		{
+			OffsetNumber offnum;
+			xl_multi_insert_tuple *xlhdr;
 
-		if (isinit)
-			offnum = FirstOffsetNumber + i;
-		else
-			offnum = xlrec->offsets[i];
-		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
-			elog(PANIC, "heap_multi_insert_redo: invalid max offset number");
+			if (isinit)
+				offnum = FirstOffsetNumber + i;
+			else
+				offnum = xlrec->offsets[i];
+			if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+				elog(PANIC, "heap_multi_insert_redo: invalid max offset number");
+
+			xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata);
+			recdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+
+			newlen = xlhdr->datalen;
+			Assert(newlen <= MaxHeapTupleSize);
+			htup = &tbuf.hdr;
+			MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
+			/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+			memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
+				   (char *) recdata,
+				   newlen);
+			recdata += newlen;
+
+			newlen += offsetof(HeapTupleHeaderData, t_bits);
+			htup->t_infomask2 = xlhdr->t_infomask2;
+			htup->t_infomask = xlhdr->t_infomask;
+			htup->t_hoff = xlhdr->t_hoff;
+			HeapTupleHeaderSetXmin(htup, record->xl_xid);
+			HeapTupleHeaderSetCmin(htup, FirstCommandId);
+			ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
+			ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+
+			offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+			if (offnum == InvalidOffsetNumber)
+				elog(PANIC, "heap_multi_insert_redo: failed to add tuple");
+		}
 
-		xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata);
-		recdata = ((char *) xlhdr) + SizeOfMultiInsertTuple;
+		freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
 
-		newlen = xlhdr->datalen;
-		Assert(newlen <= MaxHeapTupleSize);
-		htup = &tbuf.hdr;
-		MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
-		/* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-		memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
-			   (char *) recdata,
-			   newlen);
-		recdata += newlen;
+		PageSetLSN(page, lsn);
 
-		newlen += offsetof(HeapTupleHeaderData, t_bits);
-		htup->t_infomask2 = xlhdr->t_infomask2;
-		htup->t_infomask = xlhdr->t_infomask;
-		htup->t_hoff = xlhdr->t_hoff;
-		HeapTupleHeaderSetXmin(htup, record->xl_xid);
-		HeapTupleHeaderSetCmin(htup, FirstCommandId);
-		ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
-		ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+		if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
 
-		offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
-		if (offnum == InvalidOffsetNumber)
-			elog(PANIC, "heap_multi_insert_redo: failed to add tuple");
+		MarkBufferDirty(buffer);
 	}
-
-	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
-
-	PageSetLSN(page, lsn);
-
-	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
-		PageClearAllVisible(page);
-
-	MarkBufferDirty(buffer);
-	UnlockReleaseBuffer(buffer);
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
 
 	/*
 	 * If the page is running low on free space, update the FSM as well.
 	 * Arbitrarily, our definition of "low" is less than 20%. We can't do much
 	 * better than that without knowing the fill-factor for the table.
 	 *
-	 * XXX: We don't get here if the page was restored from full page image.
+	 * XXX: Don't do this if the page was restored from full page image.
 	 * We don't bother to update the FSM in that case, it doesn't need to be
 	 * totally accurate anyway.
 	 */
-	if (freespace < BLCKSZ / 5)
+	if (rc == BLK_NEEDS_REDO && freespace < BLCKSZ / 5)
 		XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace);
 }
 
@@ -7758,8 +7708,9 @@ static void
 heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 {
 	xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record);
-	bool		samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) ==
-							ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+	RelFileNode	rnode;
+	BlockNumber	newblk;
+	BlockNumber	oldblk;
 	Buffer		obuffer,
 				nbuffer;
 	Page		page;
@@ -7778,24 +7729,29 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 	}			tbuf;
 	xl_heap_header_len xlhdr;
 	uint32		newlen;
-	Size		freespace;
+	Size		freespace = 0;
+	XLogReplayResult old_replay;
+	XLogReplayResult new_replay;
 
 	/* initialize to keep the compiler quiet */
 	oldtup.t_data = NULL;
 	oldtup.t_len = 0;
 
+	rnode = xlrec->target.node;
+	newblk = ItemPointerGetBlockNumber(&xlrec->newtid);
+	oldblk = ItemPointerGetBlockNumber(&xlrec->target.tid);
+
 	/*
 	 * The visibility map may need to be fixed even if the heap page is
 	 * already up-to-date.
 	 */
 	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
 	{
-		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
-		BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
+		Relation	reln = CreateFakeRelcacheEntry(rnode);
 		Buffer		vmbuffer = InvalidBuffer;
 
-		visibilitymap_pin(reln, block, &vmbuffer);
-		visibilitymap_clear(reln, block, vmbuffer);
+		visibilitymap_pin(reln, oldblk, &vmbuffer);
+		visibilitymap_clear(reln, oldblk, vmbuffer);
 		ReleaseBuffer(vmbuffer);
 		FreeFakeRelcacheEntry(reln);
 	}
@@ -7810,84 +7766,63 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
 	 * added the new tuple to the new page.
 	 */
 
-	if (record->xl_info & XLR_BKP_BLOCK(0))
-	{
-		obuffer = RestoreBackupBlock(lsn, record, 0, false, true);
-		if (samepage)
-		{
-			/* backup block covered both changes, so we're done */
-			UnlockReleaseBuffer(obuffer);
-			return;
-		}
-		goto newt;
-	}
-
 	/* Deal with old tuple version */
-
-	obuffer = XLogReadBuffer(xlrec->target.node,
-							 ItemPointerGetBlockNumber(&(xlrec->target.tid)),
-							 false);
-	if (!BufferIsValid(obuffer))
-		goto newt;
-	page = (Page) BufferGetPage(obuffer);
-
-	if (lsn <= PageGetLSN(page))	/* changes are applied */
+	old_replay = XLogReplayBuffer(0, rnode, oldblk, &obuffer);
+	if (old_replay == BLK_NEEDS_REDO)
 	{
-		if (samepage)
-		{
-			UnlockReleaseBuffer(obuffer);
-			return;
-		}
-		goto newt;
-	}
-
-	offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
-	if (PageGetMaxOffsetNumber(page) >= offnum)
-		lp = PageGetItemId(page, offnum);
+		page = (Page) BufferGetPage(obuffer);
 
-	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-		elog(PANIC, "heap_update_redo: invalid lp");
+		offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
 
-	htup = (HeapTupleHeader) PageGetItem(page, lp);
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "heap_update_redo: invalid lp");
 
-	oldtup.t_data = htup;
-	oldtup.t_len = ItemIdGetLength(lp);
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
 
-	htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-	htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-	if (hot_update)
-		HeapTupleHeaderSetHotUpdated(htup);
-	else
-		HeapTupleHeaderClearHotUpdated(htup);
-	fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
-							   &htup->t_infomask2);
-	HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
-	HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-	/* Set forward chain link in t_ctid */
-	htup->t_ctid = xlrec->newtid;
+		oldtup.t_data = htup;
+		oldtup.t_len = ItemIdGetLength(lp);
 
-	/* Mark the page as a candidate for pruning */
-	PageSetPrunable(page, record->xl_xid);
+		htup->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		htup->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		if (hot_update)
+			HeapTupleHeaderSetHotUpdated(htup);
+		else
+			HeapTupleHeaderClearHotUpdated(htup);
+		fix_infomask_from_infobits(xlrec->old_infobits_set, &htup->t_infomask,
+								   &htup->t_infomask2);
+		HeapTupleHeaderSetXmax(htup, xlrec->old_xmax);
+		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+		/* Set forward chain link in t_ctid */
+		htup->t_ctid = xlrec->newtid;
+
+		/* Mark the page as a candidate for pruning */
+		PageSetPrunable(page, record->xl_xid);
+
+		if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
 
-	if (xlrec->flags & XLOG_HEAP_ALL_VISIBLE_CLEARED)
-		PageClearAllVisible(page);
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(obuffer);
+	}
 
-	/*
-	 * this test is ugly, but necessary to avoid thinking that insert change
-	 * is already applied
-	 */
-	if (samepage)
+	if (oldblk == newblk)
 	{
 		nbuffer = obuffer;
-		goto newsame;
+		new_replay = old_replay;
 	}
+	else if (record->xl_info & XLOG_HEAP_INIT_PAGE)
+	{
+		new_replay = XLogReplayBufferExtended(1, rnode, MAIN_FORKNUM, newblk,
+											  RBM_ZERO, false, &nbuffer);
+		Assert (new_replay == BLK_NEEDS_REDO);
+		page = (Page) BufferGetPage(nbuffer);
 
-	PageSetLSN(page, lsn);
-	MarkBufferDirty(obuffer);
-
-	/* Deal with new tuple */
-
-newt:;
+		PageInit(page, BufferGetPageSize(nbuffer), 0);
+	}
+	else
+		new_replay = XLogReplayBuffer(1, rnode, newblk, &nbuffer);
 
 	/*
 	 * The visibility map may need to be fixed even if the heap page is
@@ -7896,144 +7831,107 @@ newt:;
 	if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
 	{
 		Relation	reln = CreateFakeRelcacheEntry(xlrec->target.node);
-		BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
 		Buffer		vmbuffer = InvalidBuffer;
 
-		visibilitymap_pin(reln, block, &vmbuffer);
-		visibilitymap_clear(reln, block, vmbuffer);
+		visibilitymap_pin(reln, newblk, &vmbuffer);
+		visibilitymap_clear(reln, newblk, vmbuffer);
 		ReleaseBuffer(vmbuffer);
 		FreeFakeRelcacheEntry(reln);
 	}
 
-	if (record->xl_info & XLR_BKP_BLOCK(1))
-	{
-		(void) RestoreBackupBlock(lsn, record, 1, false, false);
-		if (BufferIsValid(obuffer))
-			UnlockReleaseBuffer(obuffer);
-		return;
-	}
-
-	if (record->xl_info & XLOG_HEAP_INIT_PAGE)
+	/* Deal with new tuple */
+	if (new_replay == BLK_NEEDS_REDO)
 	{
-		nbuffer = XLogReadBuffer(xlrec->target.node,
-								 ItemPointerGetBlockNumber(&(xlrec->newtid)),
-								 true);
-		Assert(BufferIsValid(nbuffer));
 		page = (Page) BufferGetPage(nbuffer);
 
-		PageInit(page, BufferGetPageSize(nbuffer), 0);
-	}
-	else
-	{
-		nbuffer = XLogReadBuffer(xlrec->target.node,
-								 ItemPointerGetBlockNumber(&(xlrec->newtid)),
-								 false);
-		if (!BufferIsValid(nbuffer))
+		offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
+		if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+			elog(PANIC, "heap_update_redo: invalid max offset number");
+
+		recdata = (char *) xlrec + SizeOfHeapUpdate;
+
+		if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD)
 		{
-			if (BufferIsValid(obuffer))
-				UnlockReleaseBuffer(obuffer);
-			return;
+			Assert(newblk == oldblk);
+			memcpy(&prefixlen, recdata, sizeof(uint16));
+			recdata += sizeof(uint16);
 		}
-		page = (Page) BufferGetPage(nbuffer);
-
-		if (lsn <= PageGetLSN(page))	/* changes are applied */
+		if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD)
 		{
-			UnlockReleaseBuffer(nbuffer);
-			if (BufferIsValid(obuffer))
-				UnlockReleaseBuffer(obuffer);
-			return;
+			Assert(newblk == oldblk);
+			memcpy(&suffixlen, recdata, sizeof(uint16));
+			recdata += sizeof(uint16);
 		}
-	}
-
-newsame:;
 
-	offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
-	if (PageGetMaxOffsetNumber(page) + 1 < offnum)
-		elog(PANIC, "heap_update_redo: invalid max offset number");
+		memcpy((char *) &xlhdr, recdata, SizeOfHeapHeaderLen);
+		recdata += SizeOfHeapHeaderLen;
 
-	recdata = (char *) xlrec + SizeOfHeapUpdate;
+		Assert(xlhdr.t_len + prefixlen + suffixlen <= MaxHeapTupleSize);
+		htup = &tbuf.hdr;
+		MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
 
-	if (xlrec->flags & XLOG_HEAP_PREFIX_FROM_OLD)
-	{
-		Assert(samepage);
-		memcpy(&prefixlen, recdata, sizeof(uint16));
-		recdata += sizeof(uint16);
-	}
-	if (xlrec->flags & XLOG_HEAP_SUFFIX_FROM_OLD)
-	{
-		Assert(samepage);
-		memcpy(&suffixlen, recdata, sizeof(uint16));
-		recdata += sizeof(uint16);
-	}
+		/*
+		 * Reconstruct the new tuple using the prefix and/or suffix from the old
+		 * tuple, and the data stored in the WAL record.
+		 */
+		newp = (char *) htup + offsetof(HeapTupleHeaderData, t_bits);
+		if (prefixlen > 0)
+		{
+			int			len;
+
+			/* copy bitmap [+ padding] [+ oid] from WAL record */
+			len = xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+			memcpy(newp, recdata, len);
+			recdata += len;
+			newp += len;
+
+			/* copy prefix from old tuple */
+			memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
+			newp += prefixlen;
+
+			/* copy new tuple data from WAL record */
+			len = xlhdr.t_len - (xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits));
+			memcpy(newp, recdata, len);
+			recdata += len;
+			newp += len;
+		}
+		else
+		{
+			/* copy bitmap [+ padding] [+ oid] + data from record, all in one go */
+			memcpy(newp, recdata, xlhdr.t_len);
+			recdata += xlhdr.t_len;
+			newp += xlhdr.t_len;
+		}
+		/* copy suffix from old tuple */
+		if (suffixlen > 0)
+			memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
 
-	memcpy((char *) &xlhdr, recdata, SizeOfHeapHeaderLen);
-	recdata += SizeOfHeapHeaderLen;
+		newlen = offsetof(HeapTupleHeaderData, t_bits) +xlhdr.t_len + prefixlen + suffixlen;
+		htup->t_infomask2 = xlhdr.header.t_infomask2;
+		htup->t_infomask = xlhdr.header.t_infomask;
+		htup->t_hoff = xlhdr.header.t_hoff;
 
-	Assert(xlhdr.t_len + prefixlen + suffixlen <= MaxHeapTupleSize);
-	htup = &tbuf.hdr;
-	MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
+		HeapTupleHeaderSetXmin(htup, record->xl_xid);
+		HeapTupleHeaderSetCmin(htup, FirstCommandId);
+		HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
+		/* Make sure there is no forward chain link in t_ctid */
+		htup->t_ctid = xlrec->newtid;
 
-	/*
-	 * Reconstruct the new tuple using the prefix and/or suffix from the old
-	 * tuple, and the data stored in the WAL record.
-	 */
-	newp = (char *) htup + offsetof(HeapTupleHeaderData, t_bits);
-	if (prefixlen > 0)
-	{
-		int			len;
+		offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+		if (offnum == InvalidOffsetNumber)
+			elog(PANIC, "heap_update_redo: failed to add tuple");
 
-		/* copy bitmap [+ padding] [+ oid] from WAL record */
-		len = xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits);
-		memcpy(newp, recdata, len);
-		recdata += len;
-		newp += len;
+		if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
+			PageClearAllVisible(page);
 
-		/* copy prefix from old tuple */
-		memcpy(newp, (char *) oldtup.t_data + oldtup.t_data->t_hoff, prefixlen);
-		newp += prefixlen;
+		freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
 
-		/* copy new tuple data from WAL record */
-		len = xlhdr.t_len - (xlhdr.header.t_hoff - offsetof(HeapTupleHeaderData, t_bits));
-		memcpy(newp, recdata, len);
-		recdata += len;
-		newp += len;
-	}
-	else
-	{
-		/* copy bitmap [+ padding] [+ oid] + data from record, all in one go */
-		memcpy(newp, recdata, xlhdr.t_len);
-		recdata += xlhdr.t_len;
-		newp += xlhdr.t_len;
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(nbuffer);
 	}
-	/* copy suffix from old tuple */
-	if (suffixlen > 0)
-		memcpy(newp, (char *) oldtup.t_data + oldtup.t_len - suffixlen, suffixlen);
-
-	newlen = offsetof(HeapTupleHeaderData, t_bits) +xlhdr.t_len + prefixlen + suffixlen;
-	htup->t_infomask2 = xlhdr.header.t_infomask2;
-	htup->t_infomask = xlhdr.header.t_infomask;
-	htup->t_hoff = xlhdr.header.t_hoff;
-
-	HeapTupleHeaderSetXmin(htup, record->xl_xid);
-	HeapTupleHeaderSetCmin(htup, FirstCommandId);
-	HeapTupleHeaderSetXmax(htup, xlrec->new_xmax);
-	/* Make sure there is no forward chain link in t_ctid */
-	htup->t_ctid = xlrec->newtid;
-
-	offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
-	if (offnum == InvalidOffsetNumber)
-		elog(PANIC, "heap_update_redo: failed to add tuple");
-
-	if (xlrec->flags & XLOG_HEAP_NEW_ALL_VISIBLE_CLEARED)
-		PageClearAllVisible(page);
-
-	freespace = PageGetHeapFreeSpace(page);		/* needed to update FSM below */
-
-	PageSetLSN(page, lsn);
-	MarkBufferDirty(nbuffer);
-	UnlockReleaseBuffer(nbuffer);
-
-	if (BufferIsValid(obuffer) && obuffer != nbuffer)
+	if (BufferIsValid(nbuffer) && nbuffer != obuffer)
+		UnlockReleaseBuffer(nbuffer);
+	if (BufferIsValid(obuffer))
 		UnlockReleaseBuffer(obuffer);
 
 	/*
@@ -8047,11 +7945,11 @@ newsame:;
 	 * as it did before the update, assuming the new tuple is about the same
 	 * size as the old one.
 	 *
-	 * XXX: We don't get here if the page was restored from full page image.
+	 * XXX: Don't do this if the page was restored from full page image.
 	 * We don't bother to update the FSM in that case, it doesn't need to be
 	 * totally accurate anyway.
 	 */
-	if (!hot_update && freespace < BLCKSZ / 5)
+	if (new_replay == BLK_NEEDS_REDO && !hot_update && freespace < BLCKSZ / 5)
 		XLogRecordPageWithFreeSpace(xlrec->target.node,
 								 ItemPointerGetBlockNumber(&(xlrec->newtid)),
 									freespace);
@@ -8067,53 +7965,41 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record)
 	ItemId		lp = NULL;
 	HeapTupleHeader htup;
 
-	/* If we have a full-page image, restore it and we're done */
-	if (record->xl_info & XLR_BKP_BLOCK(0))
-	{
-		(void) RestoreBackupBlock(lsn, record, 0, false, false);
-		return;
-	}
-
-	buffer = XLogReadBuffer(xlrec->target.node,
-							ItemPointerGetBlockNumber(&(xlrec->target.tid)),
-							false);
-	if (!BufferIsValid(buffer))
-		return;
-	page = (Page) BufferGetPage(buffer);
-
-	if (lsn <= PageGetLSN(page))	/* changes are applied */
+	if (XLogReplayBuffer(0, xlrec->target.node,
+						 ItemPointerGetBlockNumber(&xlrec->target.tid),
+						 &buffer) == BLK_NEEDS_REDO)
 	{
-		UnlockReleaseBuffer(buffer);
-		return;
-	}
+		page = (Page) BufferGetPage(buffer);
 
-	offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
-	if (PageGetMaxOffsetNumber(page) >= offnum)
-		lp = PageGetItemId(page, offnum);
+		offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
 
-	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-		elog(PANIC, "heap_lock_redo: invalid lp");
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "heap_lock_redo: invalid lp");
 
-	htup = (HeapTupleHeader) PageGetItem(page, lp);
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
 
-	fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
-							   &htup->t_infomask2);
+		fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
+								   &htup->t_infomask2);
 
-	/*
-	 * Clear relevant update flags, but only if the modified infomask says
-	 * there's no update.
-	 */
-	if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
-	{
-		HeapTupleHeaderClearHotUpdated(htup);
-		/* Make sure there is no forward chain link in t_ctid */
-		htup->t_ctid = xlrec->target.tid;
+		/*
+		 * Clear relevant update flags, but only if the modified infomask says
+		 * there's no update.
+		 */
+		if (HEAP_XMAX_IS_LOCKED_ONLY(htup->t_infomask))
+		{
+			HeapTupleHeaderClearHotUpdated(htup);
+			/* Make sure there is no forward chain link in t_ctid */
+			htup->t_ctid = xlrec->target.tid;
+		}
+		HeapTupleHeaderSetXmax(htup, xlrec->locking_xid);
+		HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
 	}
-	HeapTupleHeaderSetXmax(htup, xlrec->locking_xid);
-	HeapTupleHeaderSetCmax(htup, FirstCommandId, false);
-	PageSetLSN(page, lsn);
-	MarkBufferDirty(buffer);
-	UnlockReleaseBuffer(buffer);
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
 }
 
 static void
@@ -8127,42 +8013,27 @@ heap_xlog_lock_updated(XLogRecPtr lsn, XLogRecord *record)
 	ItemId		lp = NULL;
 	HeapTupleHeader htup;
 
-	/* If we have a full-page image, restore it and we're done */
-	if (record->xl_info & XLR_BKP_BLOCK(0))
-	{
-		(void) RestoreBackupBlock(lsn, record, 0, false, false);
-		return;
-	}
-
-	buffer = XLogReadBuffer(xlrec->target.node,
-							ItemPointerGetBlockNumber(&(xlrec->target.tid)),
-							false);
-	if (!BufferIsValid(buffer))
-		return;
-	page = (Page) BufferGetPage(buffer);
-
-	if (lsn <= PageGetLSN(page))	/* changes are applied */
+	if (XLogReplayBuffer(0, xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), &buffer) == BLK_NEEDS_REDO)
 	{
-		UnlockReleaseBuffer(buffer);
-		return;
-	}
-
-	offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
-	if (PageGetMaxOffsetNumber(page) >= offnum)
-		lp = PageGetItemId(page, offnum);
+		page = BufferGetPage(buffer);
+		offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
 
-	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-		elog(PANIC, "heap_xlog_lock_updated: invalid lp");
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "heap_xlog_lock_updated: invalid lp");
 
-	htup = (HeapTupleHeader) PageGetItem(page, lp);
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
 
-	fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
-							   &htup->t_infomask2);
-	HeapTupleHeaderSetXmax(htup, xlrec->xmax);
+		fix_infomask_from_infobits(xlrec->infobits_set, &htup->t_infomask,
+								   &htup->t_infomask2);
+		HeapTupleHeaderSetXmax(htup, xlrec->xmax);
 
-	PageSetLSN(page, lsn);
-	MarkBufferDirty(buffer);
-	UnlockReleaseBuffer(buffer);
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
 }
 
 static void
@@ -8177,47 +8048,35 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record)
 	uint32		oldlen;
 	uint32		newlen;
 
-	/* If we have a full-page image, restore it and we're done */
-	if (record->xl_info & XLR_BKP_BLOCK(0))
+	if (XLogReplayBuffer(0, xlrec->target.node,
+						 ItemPointerGetBlockNumber(&(xlrec->target.tid)),
+						 &buffer) == BLK_NEEDS_REDO)
 	{
-		(void) RestoreBackupBlock(lsn, record, 0, false, false);
-		return;
-	}
+		page = BufferGetPage(buffer);
 
-	buffer = XLogReadBuffer(xlrec->target.node,
-							ItemPointerGetBlockNumber(&(xlrec->target.tid)),
-							false);
-	if (!BufferIsValid(buffer))
-		return;
-	page = (Page) BufferGetPage(buffer);
+		offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+		if (PageGetMaxOffsetNumber(page) >= offnum)
+			lp = PageGetItemId(page, offnum);
 
-	if (lsn <= PageGetLSN(page))	/* changes are applied */
-	{
-		UnlockReleaseBuffer(buffer);
-		return;
-	}
+		if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
+			elog(PANIC, "heap_inplace_redo: invalid lp");
 
-	offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
-	if (PageGetMaxOffsetNumber(page) >= offnum)
-		lp = PageGetItemId(page, offnum);
+		htup = (HeapTupleHeader) PageGetItem(page, lp);
 
-	if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))
-		elog(PANIC, "heap_inplace_redo: invalid lp");
+		oldlen = ItemIdGetLength(lp) - htup->t_hoff;
+		newlen = record->xl_len - SizeOfHeapInplace;
+		if (oldlen != newlen)
+			elog(PANIC, "heap_inplace_redo: wrong tuple length");
 
-	htup = (HeapTupleHeader) PageGetItem(page, lp);
-
-	oldlen = ItemIdGetLength(lp) - htup->t_hoff;
-	newlen = record->xl_len - SizeOfHeapInplace;
-	if (oldlen != newlen)
-		elog(PANIC, "heap_inplace_redo: wrong tuple length");
-
-	memcpy((char *) htup + htup->t_hoff,
-		   (char *) xlrec + SizeOfHeapInplace,
-		   newlen);
+		memcpy((char *) htup + htup->t_hoff,
+			   (char *) xlrec + SizeOfHeapInplace,
+			   newlen);
 
-	PageSetLSN(page, lsn);
-	MarkBufferDirty(buffer);
-	UnlockReleaseBuffer(buffer);
+		PageSetLSN(page, lsn);
+		MarkBufferDirty(buffer);
+	}
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
 }
 
 void
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 34f2fc0..881652d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4039,7 +4039,7 @@ RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
 	}
 
 	/* Caller specified a bogus block_index */
-	elog(ERROR, "failed to restore block_index %d", block_index);
+	elog(PANIC, "failed to restore block_index %d", block_index);
 	return InvalidBuffer;		/* keep compiler quiet */
 }
 
@@ -6844,6 +6844,8 @@ StartupXLOG(void)
 					RecordKnownAssignedTransactionIds(record->xl_xid);
 
 				/* Now apply the WAL record itself */
+				XLogRedoLSN = EndRecPtr;
+				XLogRedoRecord = record;
 				RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
 
 				/* Pop the error context stack */
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index b7829ff..4ef2ca8 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -25,6 +25,11 @@
 #include "utils/hsearch.h"
 #include "utils/rel.h"
 
+/*
+ * WAL record currently being replayed.
+ */
+XLogRecPtr XLogRedoLSN;
+XLogRecord *XLogRedoRecord;
 
 /*
  * During XLOG replay, we may see XLOG records for incremental updates of
@@ -242,6 +247,78 @@ XLogCheckInvalidPages(void)
 	invalid_page_tab = NULL;
 }
 
+
+/*
+ * XLogReplayBuffer
+ *		Read a page during XLOG replay
+ *
+ * Reads a block referenced by a WAL record into shared buffer cache, and
+ * determines what needs to be done to replay the changes to it. If the
+ * WAL record includes a full-page image of the page, it is restored.
+ *
+ * (Getting the buffer lock is not really necessary during single-process
+ * crash recovery, but some subroutines such as MarkBufferDirty will complain
+ * if we don't have the lock.  In hot standby mode it's definitely necessary.)
+ * Returns one of the following:
+ *
+ * The returned buffer is exclusively-locked.
+ *	BLK_REPLAY		- block needs to be replayed
+ *	BLK_DONE		- block doesn't need replaying
+ *	BLK_RESTORED	- block was restored from a full-page image included in
+ *					  the record
+ *	BLK_NOTFOUND	- block was not found (because it was truncated away by
+ *					  an operation later in the WAL stream)
+ *
+ * On return, the buffer is locked in exclusive-mode, and returned in *buf.
+ * Note that the buffer is locked and returned even if it doesn't need
+ * replaying.
+ */
+XLogReplayResult
+XLogReplayBuffer(int block_index, RelFileNode rnode, BlockNumber blkno, Buffer *buf)
+{
+	return XLogReplayBufferExtended(block_index,
+									rnode, MAIN_FORKNUM, blkno,
+									RBM_NORMAL, false, buf);
+}
+
+/*
+ * XLogReplayBufferExtended
+ *		Like XLogReplayBuffer, but with extra options.
+ *
+ * If mode is RBM_ZERO or RBM_ZERO_ON_ERROR, if the page doesn't exist, the
+ * relation is extended with all-zeroes pages up to the referenced block
+ * number. In RBM_ZERO mode, the return values is always BLK_NEEDS_REDO.
+ *
+ * If 'get_cleanup_lock' is true, a "cleanup lock" is acquired on the buffer
+ * using LockBufferForCleanup(), instead of a regulare exclusive lock.
+ */
+XLogReplayResult
+XLogReplayBufferExtended(int block_index,
+						 RelFileNode rnode, ForkNumber forkno, BlockNumber blkno,
+						 ReadBufferMode mode, bool get_cleanup_lock,
+						 Buffer *buf)
+{
+	if (XLogRedoRecord->xl_info & XLR_BKP_BLOCK(block_index))
+	{
+		*buf = RestoreBackupBlock(XLogRedoLSN, XLogRedoRecord, block_index, get_cleanup_lock, true);
+		return BLK_RESTORED;
+	}
+	else
+	{
+		*buf = XLogReadBufferExtended(rnode, forkno, blkno, mode);
+		if (BufferIsValid(*buf))
+		{
+			LockBuffer(*buf, BUFFER_LOCK_EXCLUSIVE);
+			if (XLogRedoLSN <= PageGetLSN(BufferGetPage(*buf)))
+				return BLK_DONE;
+			else
+				return BLK_NEEDS_REDO;
+		}
+		else
+			return BLK_NOTFOUND;
+	}
+}
+
 /*
  * XLogReadBuffer
  *		Read a page during XLOG replay.
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 58f11d9..5fda2c6 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -1,7 +1,7 @@
 /*
  * xlogutils.h
  *
- * PostgreSQL transaction log manager utility routines
+ * Utilities for replaying WAL records.
  *
  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -22,6 +22,27 @@ extern void XLogDropDatabase(Oid dbid);
 extern void XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum,
 					 BlockNumber nblocks);
 
+/* Result codes for XLogReplayBuffer[Extended] */
+typedef enum
+{
+	BLK_NEEDS_REDO,		/* block needs to be replayed */
+	BLK_DONE,			/* block was already replayed */
+	BLK_RESTORED,		/* block was restored from a full-page image */
+	BLK_NOTFOUND		/* block was not found (and hence does not need to be
+						 * replayed) */
+} XLogReplayResult;
+
+extern XLogRecPtr XLogRedoLSN;
+extern struct XLogRecord *XLogRedoRecord;
+
+extern XLogReplayResult XLogReplayBuffer(int block_index,
+				 RelFileNode rnode, BlockNumber blkno, Buffer *buf);
+extern XLogReplayResult XLogReplayBufferExtended(int block_index,
+						 RelFileNode rnode, ForkNumber forkno,
+						 BlockNumber blkno,
+						 ReadBufferMode mode, bool get_cleanup_lock,
+						 Buffer *buf);
+
 extern Buffer XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init);
 extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
 					   BlockNumber blkno, ReadBufferMode mode);
