From b280f32f789a919d87978ea945de9521da837b9a Mon Sep 17 00:00:00 2001
From: Antonin Houska <ah@cybertec.at>
Date: Fri, 5 Jul 2019 16:24:01 +0200
Subject: [PATCH 10/17] Introduce "transient buffered file".

Currently we need it only to encrypt transaction data which gets spilled to
disk during logical decoding. This way we hide the encryption specific details
from reorderbuffer.c, as well as any other (future) user of transient files
that need to be encrypted.

The next patch of the series implements the actual encryption of those files.
---
 src/backend/postmaster/pgstat.c                 |   6 -
 src/backend/replication/logical/reorderbuffer.c | 457 ++++++++----------
 src/backend/storage/file/buffile.c              | 595 ++++++++++++++++++------
 src/include/pgstat.h                            |   2 -
 src/include/replication/reorderbuffer.h         |   4 -
 src/include/storage/buffile.h                   |  13 +-
 6 files changed, 652 insertions(+), 425 deletions(-)

diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index b4f2b28b51..54786899fd 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -4004,12 +4004,6 @@ pgstat_get_wait_io(WaitEventIO w)
 		case WAIT_EVENT_RELATION_MAP_WRITE:
 			event_name = "RelationMapWrite";
 			break;
-		case WAIT_EVENT_REORDER_BUFFER_READ:
-			event_name = "ReorderBufferRead";
-			break;
-		case WAIT_EVENT_REORDER_BUFFER_WRITE:
-			event_name = "ReorderBufferWrite";
-			break;
 		case WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ:
 			event_name = "ReorderLogicalMappingRead";
 			break;
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e7c32f2a13..844b67d44b 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -71,6 +71,7 @@
 #include "replication/slot.h"
 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
+#include "storage/buffile.h"
 #include "storage/fd.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
@@ -109,7 +110,7 @@ typedef struct ReorderBufferIterTXNEntry
 	XLogRecPtr	lsn;
 	ReorderBufferChange *change;
 	ReorderBufferTXN *txn;
-	int			fd;
+	TransientBufFile *file;
 	XLogSegNo	segno;
 } ReorderBufferIterTXNEntry;
 
@@ -192,11 +193,17 @@ static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTX
 static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
-										 int fd, ReorderBufferChange *change);
+							 TransientBufFile *file, ReorderBufferChange *change);
+static void ReorderBufferWriteData(TransientBufFile *file, void *ptr, size_t size,
+					   ReorderBufferTXN *txn);
 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-										int *fd, XLogSegNo *segno);
+							TransientBufFile **file, XLogSegNo *segno);
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
-									   char *change);
+						   TransientBufFile **file);
+static ReorderBufferTupleBuf *ReorderBufferRestoreTuple(ReorderBuffer *rb,
+						  TransientBufFile *file);
+static void ReorderBufferReadData(TransientBufFile *file, void *ptr, size_t size,
+					  bool *no_data_p);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
@@ -267,9 +274,6 @@ ReorderBufferAllocate(void)
 	buffer->by_txn_last_xid = InvalidTransactionId;
 	buffer->by_txn_last_txn = NULL;
 
-	buffer->outbuf = NULL;
-	buffer->outbufsize = 0;
-
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
 	dlist_init(&buffer->toplevel_by_lsn);
@@ -988,7 +992,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 	for (off = 0; off < state->nr_txns; off++)
 	{
-		state->entries[off].fd = -1;
+		state->entries[off].file = NULL;
 		state->entries[off].segno = 0;
 	}
 
@@ -1013,7 +1017,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		{
 			/* serialize remaining changes */
 			ReorderBufferSerializeTXN(rb, txn);
-			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd,
+			ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
 										&state->entries[off].segno);
 		}
 
@@ -1043,7 +1047,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 				/* serialize remaining changes */
 				ReorderBufferSerializeTXN(rb, cur_txn);
 				ReorderBufferRestoreChanges(rb, cur_txn,
-											&state->entries[off].fd,
+											&state->entries[off].file,
 											&state->entries[off].segno);
 			}
 			cur_change = dlist_head_element(ReorderBufferChange, node,
@@ -1124,7 +1128,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		dlist_delete(&change->node);
 		dlist_push_tail(&state->old_change, &change->node);
 
-		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd,
+		if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
 										&state->entries[off].segno))
 		{
 			/* successfully restored changes from disk */
@@ -1163,8 +1167,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
 
 	for (off = 0; off < state->nr_txns; off++)
 	{
-		if (state->entries[off].fd != -1)
-			CloseTransientFile(state->entries[off].fd);
+		if (state->entries[off].file)
+			BufFileCloseTransient(state->entries[off].file);
 	}
 
 	/* free memory we might have "leaked" in the last *Next call */
@@ -2212,24 +2216,6 @@ ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
  */
 
 /*
- * Ensure the IO buffer is >= sz.
- */
-static void
-ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
-{
-	if (!rb->outbufsize)
-	{
-		rb->outbuf = MemoryContextAlloc(rb->context, sz);
-		rb->outbufsize = sz;
-	}
-	else if (rb->outbufsize < sz)
-	{
-		rb->outbuf = repalloc(rb->outbuf, sz);
-		rb->outbufsize = sz;
-	}
-}
-
-/*
  * Check whether the transaction tx should spill its data to disk.
  */
 static void
@@ -2254,7 +2240,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	dlist_iter	subtxn_i;
 	dlist_mutable_iter change_i;
-	int			fd = -1;
+	TransientBufFile *file = NULL;
 	XLogSegNo	curOpenSegNo = 0;
 	Size		spilled = 0;
 
@@ -2281,13 +2267,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		 * store in segment in which it belongs by start lsn, don't split over
 		 * multiple segments tho
 		 */
-		if (fd == -1 ||
+		if (file == NULL ||
 			!XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
 		{
 			char		path[MAXPGPATH];
 
-			if (fd != -1)
-				CloseTransientFile(fd);
+			if (file)
+				BufFileCloseTransient(file);
 
 			XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
 
@@ -2299,16 +2285,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 										curOpenSegNo);
 
 			/* open segment, create it if necessary */
-			fd = OpenTransientFile(path,
-								   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
-
-			if (fd < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not open file \"%s\": %m", path)));
+			file = BufFileOpenTransient(path,
+										O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
 		}
 
-		ReorderBufferSerializeChange(rb, txn, fd, change);
+		ReorderBufferSerializeChange(rb, txn, file, change);
 		dlist_delete(&change->node);
 		ReorderBufferReturnChange(rb, change);
 
@@ -2320,8 +2301,8 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	txn->nentries_mem = 0;
 	txn->serialized = true;
 
-	if (fd != -1)
-		CloseTransientFile(fd);
+	if (file)
+		BufFileCloseTransient(file);
 }
 
 /*
@@ -2329,15 +2310,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
  */
 static void
 ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
-							 int fd, ReorderBufferChange *change)
+							 TransientBufFile *file, ReorderBufferChange *change)
 {
-	ReorderBufferDiskChange *ondisk;
+	ReorderBufferDiskChange hdr;
 	Size		sz = sizeof(ReorderBufferDiskChange);
 
-	ReorderBufferSerializeReserve(rb, sz);
-
-	ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-	memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
+	memcpy((char *) &hdr + offsetof(ReorderBufferDiskChange, change),
+		   change, sizeof(ReorderBufferChange));
 
 	switch (change->action)
 	{
@@ -2347,7 +2326,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_DELETE:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
 			{
-				char	   *data;
 				ReorderBufferTupleBuf *oldtup,
 						   *newtup;
 				Size		oldlen = 0;
@@ -2370,66 +2348,55 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					sz += newlen;
 				}
 
-				/* make sure we have enough space */
-				ReorderBufferSerializeReserve(rb, sz);
-
-				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
-				/* might have been reallocated above */
-				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+				hdr.size = sz;
+				ReorderBufferWriteData(file, &hdr, sizeof(ReorderBufferDiskChange),
+									   txn);
 
 				if (oldlen)
 				{
-					memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
-					data += sizeof(HeapTupleData);
-
-					memcpy(data, oldtup->tuple.t_data, oldlen);
-					data += oldlen;
+					ReorderBufferWriteData(file, &oldtup->tuple,
+										   sizeof(HeapTupleData), txn);
+					ReorderBufferWriteData(file, oldtup->tuple.t_data, oldlen,
+										   txn);
 				}
 
 				if (newlen)
 				{
-					memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
-					data += sizeof(HeapTupleData);
-
-					memcpy(data, newtup->tuple.t_data, newlen);
-					data += newlen;
+					ReorderBufferWriteData(file, &newtup->tuple,
+										   sizeof(HeapTupleData), txn);
+					ReorderBufferWriteData(file, newtup->tuple.t_data, newlen,
+										   txn);
 				}
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_MESSAGE:
 			{
-				char	   *data;
 				Size		prefix_size = strlen(change->data.msg.prefix) + 1;
 
 				sz += prefix_size + change->data.msg.message_size +
 					sizeof(Size) + sizeof(Size);
-				ReorderBufferSerializeReserve(rb, sz);
 
-				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
-
-				/* might have been reallocated above */
-				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+				hdr.size = sz;
+				ReorderBufferWriteData(file, &hdr,
+									   sizeof(ReorderBufferDiskChange),
+									   txn);
 
 				/* write the prefix including the size */
-				memcpy(data, &prefix_size, sizeof(Size));
-				data += sizeof(Size);
-				memcpy(data, change->data.msg.prefix,
-					   prefix_size);
-				data += prefix_size;
+				ReorderBufferWriteData(file, &prefix_size, sizeof(Size), txn);
+				ReorderBufferWriteData(file, change->data.msg.prefix,
+									   prefix_size, txn);
 
 				/* write the message including the size */
-				memcpy(data, &change->data.msg.message_size, sizeof(Size));
-				data += sizeof(Size);
-				memcpy(data, change->data.msg.message,
-					   change->data.msg.message_size);
-				data += change->data.msg.message_size;
+				ReorderBufferWriteData(file, &change->data.msg.message_size,
+									   sizeof(Size), txn);
+				ReorderBufferWriteData(file, change->data.msg.message,
+									   change->data.msg.message_size, txn);
 
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			{
 				Snapshot	snap;
-				char	   *data;
 
 				snap = change->data.snapshot;
 
@@ -2438,49 +2405,37 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					sizeof(TransactionId) * snap->subxcnt
 					;
 
-				/* make sure we have enough space */
-				ReorderBufferSerializeReserve(rb, sz);
-				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
-				/* might have been reallocated above */
-				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+				hdr.size = sz;
+				ReorderBufferWriteData(file, &hdr,
+									   sizeof(ReorderBufferDiskChange), txn);
 
-				memcpy(data, snap, sizeof(SnapshotData));
-				data += sizeof(SnapshotData);
+				ReorderBufferWriteData(file, snap, sizeof(SnapshotData), txn);
 
 				if (snap->xcnt)
-				{
-					memcpy(data, snap->xip,
-						   sizeof(TransactionId) * snap->xcnt);
-					data += sizeof(TransactionId) * snap->xcnt;
-				}
+					ReorderBufferWriteData(file, snap->xip,
+										   sizeof(TransactionId) * snap->xcnt,
+										   txn);
 
 				if (snap->subxcnt)
-				{
-					memcpy(data, snap->subxip,
-						   sizeof(TransactionId) * snap->subxcnt);
-					data += sizeof(TransactionId) * snap->subxcnt;
-				}
+					ReorderBufferWriteData(file, snap->subxip,
+										   sizeof(TransactionId) * snap->subxcnt,
+										   txn);
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_TRUNCATE:
 			{
 				Size		size;
-				char	   *data;
 
 				/* account for the OIDs of truncated relations */
 				size = sizeof(Oid) * change->data.truncate.nrelids;
 				sz += size;
 
-				/* make sure we have enough space */
-				ReorderBufferSerializeReserve(rb, sz);
-
-				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
-				/* might have been reallocated above */
-				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-
-				memcpy(data, change->data.truncate.relids, size);
-				data += size;
+				hdr.size = sz;
+				ReorderBufferWriteData(file, &hdr, sizeof(ReorderBufferDiskChange),
+									   txn);
 
+				ReorderBufferWriteData(file, change->data.truncate.relids, size,
+									   txn);
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -2489,27 +2444,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			/* ReorderBufferChange contains everything important */
 			break;
 	}
+}
 
-	ondisk->size = sz;
-
-	errno = 0;
-	pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
-	if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-
-		/* if write didn't set errno, assume problem is no disk space */
-		errno = save_errno ? save_errno : ENOSPC;
+/*
+ * Wrapper for BufFileWriteTransient() that raises ERROR if the whole chunk
+ * was not written. XXX Should this be a macro?
+ */
+static void
+ReorderBufferWriteData(TransientBufFile *file, void *ptr, size_t size,
+					   ReorderBufferTXN *txn)
+{
+	if (BufFileWriteTransient(file, ptr, size) != size)
 		ereport(ERROR,
 				(errcode_for_file_access(),
 				 errmsg("could not write to data file for XID %u: %m",
 						txn->xid)));
-	}
-	pgstat_report_wait_end();
-
-	Assert(ondisk->change.action == change->action);
 }
 
 /*
@@ -2517,7 +2466,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
  */
 static Size
 ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
-							int *fd, XLogSegNo *segno)
+							TransientBufFile **file, XLogSegNo *segno)
 {
 	Size		restored = 0;
 	XLogSegNo	last_segno;
@@ -2542,10 +2491,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	while (restored < max_changes_in_memory && *segno <= last_segno)
 	{
-		int			readBytes;
-		ReorderBufferDiskChange *ondisk;
-
-		if (*fd == -1)
+		if (*file == NULL)
 		{
 			char		path[MAXPGPATH];
 
@@ -2562,77 +2508,24 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
 										*segno);
 
-			*fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
-			if (*fd < 0 && errno == ENOENT)
+			*file = BufFileOpenTransient(path, O_RDONLY | PG_BINARY);
+			if (*file == NULL)
 			{
-				*fd = -1;
+				Assert(errno == ENOENT);
 				(*segno)++;
 				continue;
 			}
-			else if (*fd < 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not open file \"%s\": %m",
-								path)));
 		}
 
-		/*
-		 * Read the statically sized part of a change which has information
-		 * about the total size. If we couldn't read a record, we're at the
-		 * end of this file.
-		 */
-		ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
-		pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
-		readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange));
-		pgstat_report_wait_end();
-
-		/* eof */
-		if (readBytes == 0)
+		ReorderBufferRestoreChange(rb, txn, file);
+		if (*file)
+			restored++;
+		else
 		{
-			CloseTransientFile(*fd);
-			*fd = -1;
+			/* No data could be restored. */
 			(*segno)++;
 			continue;
 		}
-		else if (readBytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from reorderbuffer spill file: %m")));
-		else if (readBytes != sizeof(ReorderBufferDiskChange))
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
-							readBytes,
-							(uint32) sizeof(ReorderBufferDiskChange))));
-
-		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-
-		ReorderBufferSerializeReserve(rb,
-									  sizeof(ReorderBufferDiskChange) + ondisk->size);
-		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-
-		pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ);
-		readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange),
-						 ondisk->size - sizeof(ReorderBufferDiskChange));
-		pgstat_report_wait_end();
-
-		if (readBytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from reorderbuffer spill file: %m")));
-		else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
-							readBytes,
-							(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
-
-		/*
-		 * ok, read a full change from disk, now restore it into proper
-		 * in-memory format
-		 */
-		ReorderBufferRestoreChange(rb, txn, rb->outbuf);
-		restored++;
 	}
 
 	return restored;
@@ -2642,25 +2535,36 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
  * Convert change from its on-disk format to in-memory format and queue it onto
  * the TXN's ->changes list.
  *
- * Note: although "data" is declared char*, at entry it points to a
- * maxalign'd buffer, making it safe in most of this function to assume
- * that the pointed-to data is suitably aligned for direct access.
+ * If no data was found in the file, close it and set *file to NULL.
  */
 static void
 ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
-						   char *data)
+						   TransientBufFile **file)
 {
-	ReorderBufferDiskChange *ondisk;
+	ReorderBufferDiskChange ondisk;
+	bool		no_data;
 	ReorderBufferChange *change;
 
-	ondisk = (ReorderBufferDiskChange *) data;
+	/*
+	 * Read the statically sized part of a change which has information about
+	 * the total size. If we couldn't read a record, we're at the end of this
+	 * file.
+	 */
+	ReorderBufferReadData(*file, &ondisk, sizeof(ReorderBufferDiskChange),
+						  &no_data);
+
+	/* eof */
+	if (no_data)
+	{
+		BufFileCloseTransient(*file);
+		*file = NULL;
+		return;
+	}
 
 	change = ReorderBufferGetChange(rb);
 
 	/* copy static part */
-	memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
-
-	data += sizeof(ReorderBufferDiskChange);
+	memcpy(change, &ondisk.change, sizeof(ReorderBufferChange));
 
 	/* restore individual stuff */
 	switch (change->action)
@@ -2671,50 +2575,10 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_DELETE:
 		case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
 			if (change->data.tp.oldtuple)
-			{
-				uint32		tuplelen = ((HeapTuple) data)->t_len;
-
-				change->data.tp.oldtuple =
-					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
-
-				/* restore ->tuple */
-				memcpy(&change->data.tp.oldtuple->tuple, data,
-					   sizeof(HeapTupleData));
-				data += sizeof(HeapTupleData);
-
-				/* reset t_data pointer into the new tuplebuf */
-				change->data.tp.oldtuple->tuple.t_data =
-					ReorderBufferTupleBufData(change->data.tp.oldtuple);
-
-				/* restore tuple data itself */
-				memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
-				data += tuplelen;
-			}
+				change->data.tp.oldtuple = ReorderBufferRestoreTuple(rb, *file);
 
 			if (change->data.tp.newtuple)
-			{
-				/* here, data might not be suitably aligned! */
-				uint32		tuplelen;
-
-				memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
-					   sizeof(uint32));
-
-				change->data.tp.newtuple =
-					ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
-
-				/* restore ->tuple */
-				memcpy(&change->data.tp.newtuple->tuple, data,
-					   sizeof(HeapTupleData));
-				data += sizeof(HeapTupleData);
-
-				/* reset t_data pointer into the new tuplebuf */
-				change->data.tp.newtuple->tuple.t_data =
-					ReorderBufferTupleBufData(change->data.tp.newtuple);
-
-				/* restore tuple data itself */
-				memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
-				data += tuplelen;
-			}
+				change->data.tp.newtuple = ReorderBufferRestoreTuple(rb, *file);
 
 			break;
 		case REORDER_BUFFER_CHANGE_MESSAGE:
@@ -2722,44 +2586,44 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				Size		prefix_size;
 
 				/* read prefix */
-				memcpy(&prefix_size, data, sizeof(Size));
-				data += sizeof(Size);
+				ReorderBufferReadData(*file, &prefix_size, sizeof(Size), NULL);
 				change->data.msg.prefix = MemoryContextAlloc(rb->context,
 															 prefix_size);
-				memcpy(change->data.msg.prefix, data, prefix_size);
+				ReorderBufferReadData(*file, change->data.msg.prefix,
+									  prefix_size, NULL);
 				Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
-				data += prefix_size;
 
 				/* read the message */
-				memcpy(&change->data.msg.message_size, data, sizeof(Size));
-				data += sizeof(Size);
+				ReorderBufferReadData(*file, &change->data.msg.message_size,
+									  sizeof(Size), NULL);
 				change->data.msg.message = MemoryContextAlloc(rb->context,
 															  change->data.msg.message_size);
-				memcpy(change->data.msg.message, data,
-					   change->data.msg.message_size);
-				data += change->data.msg.message_size;
+				ReorderBufferReadData(*file, change->data.msg.message,
+									  change->data.msg.message_size, NULL);
 
 				break;
 			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			{
-				Snapshot	oldsnap;
+				SnapshotData oldsnap;
 				Snapshot	newsnap;
 				Size		size;
 
-				oldsnap = (Snapshot) data;
+				ReorderBufferReadData(*file, &oldsnap, sizeof(SnapshotData), NULL);
 
 				size = sizeof(SnapshotData) +
-					sizeof(TransactionId) * oldsnap->xcnt +
-					sizeof(TransactionId) * (oldsnap->subxcnt + 0);
+					sizeof(TransactionId) * oldsnap.xcnt +
+					sizeof(TransactionId) * (oldsnap.subxcnt + 0);
 
 				change->data.snapshot = MemoryContextAllocZero(rb->context, size);
 
 				newsnap = change->data.snapshot;
 
-				memcpy(newsnap, data, size);
+				memcpy(newsnap, &oldsnap, sizeof(SnapshotData));
 				newsnap->xip = (TransactionId *)
 					(((char *) newsnap) + sizeof(SnapshotData));
+				ReorderBufferReadData(*file, newsnap->xip,
+									  size - sizeof(SnapshotData), NULL);
 				newsnap->subxip = newsnap->xip + newsnap->xcnt;
 				newsnap->copied = true;
 				break;
@@ -2771,7 +2635,9 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 				relids = ReorderBufferGetRelids(rb,
 												change->data.truncate.nrelids);
-				memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
+				ReorderBufferReadData(*file, relids,
+									  change->data.truncate.nrelids * sizeof(Oid),
+									  NULL);
 				change->data.truncate.relids = relids;
 
 				break;
@@ -2787,6 +2653,77 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 }
 
 /*
+ * Convert heap tuple from its on-disk format to in-memory format.
+ */
+static ReorderBufferTupleBuf *
+ReorderBufferRestoreTuple(ReorderBuffer *rb, TransientBufFile *file)
+{
+	HeapTupleData tupdata;
+	uint32		tuplelen;
+	ReorderBufferTupleBuf *result;
+
+	ReorderBufferReadData(file, &tupdata, sizeof(HeapTupleData), NULL);
+	tuplelen = tupdata.t_len;
+
+	result = ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
+
+	/* restore ->tuple */
+	memcpy(&result->tuple, &tupdata, sizeof(HeapTupleData));
+
+	/* reset t_data pointer into the new tuplebuf */
+	result->tuple.t_data = ReorderBufferTupleBufData(result);
+
+	/* restore tuple data itself */
+	ReorderBufferReadData(file, result->tuple.t_data, tuplelen, NULL);
+
+	return result;
+}
+
+/*
+ * Wrapper for BufFileReadTransient() that raises ERROR if the expected amount
+ * of bytes was not read.
+ *
+ * If valid pointer is passed for no_data_p, set *no_data_p to indicate
+ * whether zero bytes was read. If NULL is passed, do not tolerate missing
+ * data.
+ */
+static void
+ReorderBufferReadData(TransientBufFile *file, void *ptr, size_t size,
+					  bool *no_data_p)
+{
+	int			readBytes;
+
+	/*
+	 * Caller should not request zero bytes. This assumption simplifies
+	 * setting of *no_data_p below.
+	 */
+	Assert(size > 0);
+
+	if ((readBytes = BufFileReadTransient(file, ptr, size)) != size)
+	{
+		if (no_data_p)
+			*no_data_p = readBytes == 0;
+
+		/*
+		 * It is o.k. to receive exactly zero bytes if caller passed valid
+		 * no_data_p.
+		 */
+		if (no_data_p && *no_data_p)
+			return;
+
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
+						readBytes, (uint32) size)));
+	}
+	else if (no_data_p)
+	{
+		/* Given that size is non-zero, readBytes must be non-zero too. */
+		*no_data_p = false;
+	}
+}
+
+/*
  * Remove all on-disk stored for the passed in transaction.
  */
 static void
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index b773a76049..b31324627a 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -41,6 +41,8 @@
 
 #include "postgres.h"
 
+#include <unistd.h>
+
 #include "commands/tablespace.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
@@ -59,19 +61,44 @@
 #define BUFFILE_SEG_SIZE		(MAX_PHYSICAL_FILESIZE / BLCKSZ)
 
 /*
+ * Fields that both BufFile and TransientBufFile structures need. It must be
+ * the first field of those structures.
+ */
+typedef struct BufFileCommon
+{
+	bool		dirty;			/* does buffer need to be written? */
+	int			pos;			/* next read/write position in buffer */
+	int			nbytes;			/* total # of valid bytes in buffer */
+
+	/*
+	 * "current pos" is position of start of buffer within the logical file.
+	 * Position as seen by user of BufFile is (curFile, curOffset + pos).
+	 */
+	int			curFile;		/* file index (0..n) part of current pos,
+								 * always zero for TransientBufFile */
+	off_t		curOffset;		/* offset part of current pos */
+
+	bool		readOnly;		/* has the file been set to read only? */
+
+	bool		append;			/* should new data be appended to the end? */
+
+	PGAlignedBlock buffer;
+} BufFileCommon;
+
+/*
  * This data structure represents a buffered file that consists of one or
  * more physical files (each accessed through a virtual file descriptor
  * managed by fd.c).
  */
 struct BufFile
 {
+	BufFileCommon common;		/* Common fields, see above. */
+
 	int			numFiles;		/* number of physical files in set */
 	/* all files except the last have length exactly MAX_PHYSICAL_FILESIZE */
 	File	   *files;			/* palloc'd array with numFiles entries */
 
 	bool		isInterXact;	/* keep open over transactions? */
-	bool		dirty;			/* does buffer need to be written? */
-	bool		readOnly;		/* has the file been set to read only? */
 
 	SharedFileSet *fileset;		/* space for segment files if shared */
 	const char *name;			/* name of this BufFile if shared */
@@ -82,16 +109,20 @@ struct BufFile
 	 * because after creation we only repalloc our arrays larger.)
 	 */
 	ResourceOwner resowner;
+};
 
-	/*
-	 * "current pos" is position of start of buffer within the logical file.
-	 * Position as seen by user of BufFile is (curFile, curOffset + pos).
-	 */
-	int			curFile;		/* file index (0..n) part of current pos */
-	off_t		curOffset;		/* offset part of current pos */
-	int			pos;			/* next read/write position in buffer */
-	int			nbytes;			/* total # of valid bytes in buffer */
-	PGAlignedBlock buffer;
+/*
+ * Buffered variant of a transient file. Unlike BufFile this is simpler in
+ * several ways: 1) it's not split into segments, 2) there's no need of seek,
+ * 3) there's no need to combine read and write access.
+ */
+struct TransientBufFile
+{
+	BufFileCommon common;		/* Common fields, see above. */
+
+	/* The underlying file. */
+	char	   *path;
+	int			fd;
 };
 
 static BufFile *makeBufFileCommon(int nfiles);
@@ -102,22 +133,32 @@ static void BufFileDumpBuffer(BufFile *file);
 static int	BufFileFlush(BufFile *file);
 static File MakeNewSharedSegment(BufFile *file, int segment);
 
+static void BufFileLoadBufferTransient(TransientBufFile *file);
+static void BufFileDumpBufferTransient(TransientBufFile *file);
+
+static size_t BufFileReadCommon(BufFileCommon *file, void *ptr, size_t size,
+				  bool is_transient);
+static size_t BufFileWriteCommon(BufFileCommon *file, void *ptr, size_t size,
+				   bool is_transient);
+
 /*
  * Create BufFile and perform the common initialization.
  */
 static BufFile *
 makeBufFileCommon(int nfiles)
 {
-	BufFile    *file = (BufFile *) palloc(sizeof(BufFile));
+	BufFile    *file = (BufFile *) palloc0(sizeof(BufFile));
+	BufFileCommon *fcommon = &file->common;
+
+	fcommon->dirty = false;
+	fcommon->curFile = 0;
+	fcommon->curOffset = 0L;
+	fcommon->pos = 0;
+	fcommon->nbytes = 0;
 
 	file->numFiles = nfiles;
 	file->isInterXact = false;
-	file->dirty = false;
 	file->resowner = CurrentResourceOwner;
-	file->curFile = 0;
-	file->curOffset = 0L;
-	file->pos = 0;
-	file->nbytes = 0;
 
 	return file;
 }
@@ -133,7 +174,7 @@ makeBufFile(File firstfile)
 
 	file->files = (File *) palloc(sizeof(File));
 	file->files[0] = firstfile;
-	file->readOnly = false;
+	file->common.readOnly = false;
 	file->fileset = NULL;
 	file->name = NULL;
 
@@ -264,7 +305,7 @@ BufFileCreateShared(SharedFileSet *fileset, const char *name)
 	file->name = pstrdup(name);
 	file->files = (File *) palloc(sizeof(File));
 	file->files[0] = MakeNewSharedSegment(file, 0);
-	file->readOnly = false;
+	file->common.readOnly = false;
 
 	return file;
 }
@@ -321,7 +362,7 @@ BufFileOpenShared(SharedFileSet *fileset, const char *name)
 
 	file = makeBufFileCommon(nfiles);
 	file->files = files;
-	file->readOnly = true;		/* Can't write to files opened this way */
+	file->common.readOnly = true;	/* Can't write to files opened this way */
 	file->fileset = fileset;
 	file->name = pstrdup(name);
 
@@ -376,10 +417,10 @@ BufFileExportShared(BufFile *file)
 	Assert(file->fileset != NULL);
 
 	/* It's probably a bug if someone calls this twice. */
-	Assert(!file->readOnly);
+	Assert(!file->common.readOnly);
 
 	BufFileFlush(file);
-	file->readOnly = true;
+	file->common.readOnly = true;
 }
 
 /*
@@ -406,7 +447,7 @@ BufFileClose(BufFile *file)
  * BufFileLoadBuffer
  *
  * Load some data into buffer, if possible, starting from curOffset.
- * At call, must have dirty = false, pos and nbytes = 0.
+ * At call, must have dirty = false, nbytes = 0.
  * On exit, nbytes is number of bytes loaded.
  */
 static void
@@ -417,27 +458,27 @@ BufFileLoadBuffer(BufFile *file)
 	/*
 	 * Advance to next component file if necessary and possible.
 	 */
-	if (file->curOffset >= MAX_PHYSICAL_FILESIZE &&
-		file->curFile + 1 < file->numFiles)
+	if (file->common.curOffset >= MAX_PHYSICAL_FILESIZE &&
+		file->common.curFile + 1 < file->numFiles)
 	{
-		file->curFile++;
-		file->curOffset = 0L;
+		file->common.curFile++;
+		file->common.curOffset = 0L;
 	}
 
 	/*
 	 * Read whatever we can get, up to a full bufferload.
 	 */
-	thisfile = file->files[file->curFile];
-	file->nbytes = FileRead(thisfile,
-							file->buffer.data,
-							sizeof(file->buffer),
-							file->curOffset,
-							WAIT_EVENT_BUFFILE_READ);
-	if (file->nbytes < 0)
-		file->nbytes = 0;
+	thisfile = file->files[file->common.curFile];
+	file->common.nbytes = FileRead(thisfile,
+								   file->common.buffer.data,
+								   sizeof(file->common.buffer),
+								   file->common.curOffset,
+								   WAIT_EVENT_BUFFILE_READ);
+	if (file->common.nbytes < 0)
+		file->common.nbytes = 0;
 	/* we choose not to advance curOffset here */
 
-	if (file->nbytes > 0)
+	if (file->common.nbytes > 0)
 		pgBufferUsage.temp_blks_read++;
 }
 
@@ -459,44 +500,44 @@ BufFileDumpBuffer(BufFile *file)
 	 * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
 	 * crosses a component-file boundary; so we need a loop.
 	 */
-	while (wpos < file->nbytes)
+	while (wpos < file->common.nbytes)
 	{
 		off_t		availbytes;
 
 		/*
 		 * Advance to next component file if necessary and possible.
 		 */
-		if (file->curOffset >= MAX_PHYSICAL_FILESIZE)
+		if (file->common.curOffset >= MAX_PHYSICAL_FILESIZE)
 		{
-			while (file->curFile + 1 >= file->numFiles)
+			while (file->common.curFile + 1 >= file->numFiles)
 				extendBufFile(file);
-			file->curFile++;
-			file->curOffset = 0L;
+			file->common.curFile++;
+			file->common.curOffset = 0L;
 		}
 
 		/*
 		 * Determine how much we need to write into this file.
 		 */
-		bytestowrite = file->nbytes - wpos;
-		availbytes = MAX_PHYSICAL_FILESIZE - file->curOffset;
+		bytestowrite = file->common.nbytes - wpos;
+		availbytes = MAX_PHYSICAL_FILESIZE - file->common.curOffset;
 
 		if ((off_t) bytestowrite > availbytes)
 			bytestowrite = (int) availbytes;
 
-		thisfile = file->files[file->curFile];
+		thisfile = file->files[file->common.curFile];
 		bytestowrite = FileWrite(thisfile,
-								 file->buffer.data + wpos,
+								 file->common.buffer.data + wpos,
 								 bytestowrite,
-								 file->curOffset,
+								 file->common.curOffset,
 								 WAIT_EVENT_BUFFILE_WRITE);
 		if (bytestowrite <= 0)
 			return;				/* failed to write */
-		file->curOffset += bytestowrite;
+		file->common.curOffset += bytestowrite;
 		wpos += bytestowrite;
 
 		pgBufferUsage.temp_blks_written++;
 	}
-	file->dirty = false;
+	file->common.dirty = false;
 
 	/*
 	 * At this point, curOffset has been advanced to the end of the buffer,
@@ -504,19 +545,19 @@ BufFileDumpBuffer(BufFile *file)
 	 * logical file position, ie, original value + pos, in case that is less
 	 * (as could happen due to a small backwards seek in a dirty buffer!)
 	 */
-	file->curOffset -= (file->nbytes - file->pos);
-	if (file->curOffset < 0)	/* handle possible segment crossing */
+	file->common.curOffset -= (file->common.nbytes - file->common.pos);
+	if (file->common.curOffset < 0) /* handle possible segment crossing */
 	{
-		file->curFile--;
-		Assert(file->curFile >= 0);
-		file->curOffset += MAX_PHYSICAL_FILESIZE;
+		file->common.curFile--;
+		Assert(file->common.curFile >= 0);
+		file->common.curOffset += MAX_PHYSICAL_FILESIZE;
 	}
 
 	/*
 	 * Now we can set the buffer empty without changing the logical position
 	 */
-	file->pos = 0;
-	file->nbytes = 0;
+	file->common.pos = 0;
+	file->common.nbytes = 0;
 }
 
 /*
@@ -527,43 +568,7 @@ BufFileDumpBuffer(BufFile *file)
 size_t
 BufFileRead(BufFile *file, void *ptr, size_t size)
 {
-	size_t		nread = 0;
-	size_t		nthistime;
-
-	if (file->dirty)
-	{
-		if (BufFileFlush(file) != 0)
-			return 0;			/* could not flush... */
-		Assert(!file->dirty);
-	}
-
-	while (size > 0)
-	{
-		if (file->pos >= file->nbytes)
-		{
-			/* Try to load more data into buffer. */
-			file->curOffset += file->pos;
-			file->pos = 0;
-			file->nbytes = 0;
-			BufFileLoadBuffer(file);
-			if (file->nbytes <= 0)
-				break;			/* no more data available */
-		}
-
-		nthistime = file->nbytes - file->pos;
-		if (nthistime > size)
-			nthistime = size;
-		Assert(nthistime > 0);
-
-		memcpy(ptr, file->buffer.data + file->pos, nthistime);
-
-		file->pos += nthistime;
-		ptr = (void *) ((char *) ptr + nthistime);
-		size -= nthistime;
-		nread += nthistime;
-	}
-
-	return nread;
+	return BufFileReadCommon(&file->common, ptr, size, false);
 }
 
 /*
@@ -574,48 +579,7 @@ BufFileRead(BufFile *file, void *ptr, size_t size)
 size_t
 BufFileWrite(BufFile *file, void *ptr, size_t size)
 {
-	size_t		nwritten = 0;
-	size_t		nthistime;
-
-	Assert(!file->readOnly);
-
-	while (size > 0)
-	{
-		if (file->pos >= BLCKSZ)
-		{
-			/* Buffer full, dump it out */
-			if (file->dirty)
-			{
-				BufFileDumpBuffer(file);
-				if (file->dirty)
-					break;		/* I/O error */
-			}
-			else
-			{
-				/* Hmm, went directly from reading to writing? */
-				file->curOffset += file->pos;
-				file->pos = 0;
-				file->nbytes = 0;
-			}
-		}
-
-		nthistime = BLCKSZ - file->pos;
-		if (nthistime > size)
-			nthistime = size;
-		Assert(nthistime > 0);
-
-		memcpy(file->buffer.data + file->pos, ptr, nthistime);
-
-		file->dirty = true;
-		file->pos += nthistime;
-		if (file->nbytes < file->pos)
-			file->nbytes = file->pos;
-		ptr = (void *) ((char *) ptr + nthistime);
-		size -= nthistime;
-		nwritten += nthistime;
-	}
-
-	return nwritten;
+	return BufFileWriteCommon(&file->common, ptr, size, false);
 }
 
 /*
@@ -626,10 +590,10 @@ BufFileWrite(BufFile *file, void *ptr, size_t size)
 static int
 BufFileFlush(BufFile *file)
 {
-	if (file->dirty)
+	if (file->common.dirty)
 	{
 		BufFileDumpBuffer(file);
-		if (file->dirty)
+		if (file->common.dirty)
 			return EOF;
 	}
 
@@ -667,8 +631,8 @@ BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
 			 * fileno. Note that large offsets (> 1 gig) risk overflow in this
 			 * add, unless we have 64-bit off_t.
 			 */
-			newFile = file->curFile;
-			newOffset = (file->curOffset + file->pos) + offset;
+			newFile = file->common.curFile;
+			newOffset = (file->common.curOffset + file->common.pos) + offset;
 			break;
 #ifdef NOT_USED
 		case SEEK_END:
@@ -685,9 +649,9 @@ BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
 			return EOF;
 		newOffset += MAX_PHYSICAL_FILESIZE;
 	}
-	if (newFile == file->curFile &&
-		newOffset >= file->curOffset &&
-		newOffset <= file->curOffset + file->nbytes)
+	if (newFile == file->common.curFile &&
+		newOffset >= file->common.curOffset &&
+		newOffset <= file->common.curOffset + file->common.nbytes)
 	{
 		/*
 		 * Seek is to a point within existing buffer; we can just adjust
@@ -695,7 +659,7 @@ BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
 		 * whether reading or writing, but buffer remains dirty if we were
 		 * writing.
 		 */
-		file->pos = (int) (newOffset - file->curOffset);
+		file->common.pos = (int) (newOffset - file->common.curOffset);
 		return 0;
 	}
 	/* Otherwise, must reposition buffer, so flush any dirty data */
@@ -723,18 +687,18 @@ BufFileSeek(BufFile *file, int fileno, off_t offset, int whence)
 	if (newFile >= file->numFiles)
 		return EOF;
 	/* Seek is OK! */
-	file->curFile = newFile;
-	file->curOffset = newOffset;
-	file->pos = 0;
-	file->nbytes = 0;
+	file->common.curFile = newFile;
+	file->common.curOffset = newOffset;
+	file->common.pos = 0;
+	file->common.nbytes = 0;
 	return 0;
 }
 
 void
 BufFileTell(BufFile *file, int *fileno, off_t *offset)
 {
-	*fileno = file->curFile;
-	*offset = file->curOffset + file->pos;
+	*fileno = file->common.curFile;
+	*offset = file->common.curOffset + file->common.pos;
 }
 
 /*
@@ -768,8 +732,8 @@ BufFileTellBlock(BufFile *file)
 {
 	long		blknum;
 
-	blknum = (file->curOffset + file->pos) / BLCKSZ;
-	blknum += file->curFile * BUFFILE_SEG_SIZE;
+	blknum = (file->common.curOffset + file->common.pos) / BLCKSZ;
+	blknum += file->common.curFile * BUFFILE_SEG_SIZE;
 	return blknum;
 }
 
@@ -828,8 +792,8 @@ BufFileAppend(BufFile *target, BufFile *source)
 	int			i;
 
 	Assert(target->fileset != NULL);
-	Assert(source->readOnly);
-	Assert(!source->dirty);
+	Assert(source->common.readOnly);
+	Assert(!source->common.dirty);
 	Assert(source->fileset != NULL);
 
 	if (target->resowner != source->resowner)
@@ -843,3 +807,330 @@ BufFileAppend(BufFile *target, BufFile *source)
 
 	return startBlock;
 }
+
+/*
+ * Open TransientBufFile at given path or create one if it does not
+ * exist. User will be allowed either to write to the file or to read from it,
+ * according to fileFlags, but not both.
+ */
+TransientBufFile *
+BufFileOpenTransient(const char *path, int fileFlags)
+{
+	bool		readOnly;
+	bool		append = false;
+	TransientBufFile *file;
+	BufFileCommon *fcommon;
+	int			fd;
+	off_t		size;
+
+	/* Either read or write mode, but not both. */
+	Assert((fileFlags & O_RDWR) == 0);
+
+	/* Check whether user wants read or write access. */
+	readOnly = (fileFlags & O_WRONLY) == 0;
+
+	/*
+	 * Append mode for read access is not useful, so don't bother implementing
+	 * it.
+	 */
+	Assert(!(readOnly && append));
+
+	errno = 0;
+	fd = OpenTransientFile(path, fileFlags);
+	if (fd < 0)
+	{
+		/*
+		 * If caller wants to read from file and the file is not there, he
+		 * should be able to handle the condition on his own.
+		 *
+		 * XXX Shouldn't we always let caller evaluate errno?
+		 */
+		if (errno == ENOENT && (fileFlags & O_RDONLY))
+			return NULL;
+
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m", path)));
+	}
+
+	file = (TransientBufFile *) palloc(sizeof(TransientBufFile));
+	fcommon = &file->common;
+	fcommon->dirty = false;
+	fcommon->pos = 0;
+	fcommon->nbytes = 0;
+	fcommon->readOnly = readOnly;
+	fcommon->append = append;
+	fcommon->curFile = 0;
+
+	file->path = pstrdup(path);
+	file->fd = fd;
+
+	errno = 0;
+	size = lseek(file->fd, 0, SEEK_END);
+	if (errno > 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not initialize TransientBufFile for file \"%s\": %m",
+						path)));
+
+	if (fcommon->append)
+	{
+		/* Position the buffer at the end of the file. */
+		fcommon->curOffset = size;
+	}
+	else
+		fcommon->curOffset = 0L;
+
+	return file;
+}
+
+/*
+ * Close a TransientBufFile.
+ */
+void
+BufFileCloseTransient(TransientBufFile *file)
+{
+	/* Flush any unwritten data. */
+	if (!file->common.readOnly &&
+		file->common.dirty && file->common.nbytes > 0)
+	{
+		BufFileDumpBufferTransient(file);
+
+		/*
+		 * Caller of BufFileWriteTransient() recognizes the failure to flush
+		 * buffer by the returned value, however this function has no return
+		 * code.
+		 */
+		if (file->common.dirty)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not flush file \"%s\": %m", file->path)));
+	}
+
+	if (CloseTransientFile(file->fd))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not close file \"%s\": %m", file->path)));
+
+	pfree(file->path);
+	pfree(file);
+}
+
+/*
+ * Load some data into buffer, if possible, starting from file->offset.  At
+ * call, must have dirty = false, pos and nbytes = 0.  On exit, nbytes is
+ * number of bytes loaded.
+ */
+static void
+BufFileLoadBufferTransient(TransientBufFile *file)
+{
+	Assert(file->common.readOnly);
+	Assert(!file->common.dirty);
+	Assert(file->common.pos == 0 && file->common.nbytes == 0);
+
+retry:
+
+	/*
+	 * Read whatever we can get, up to a full bufferload.
+	 */
+	errno = 0;
+	pgstat_report_wait_start(WAIT_EVENT_BUFFILE_READ);
+	file->common.nbytes = pg_pread(file->fd,
+								   file->common.buffer.data,
+								   sizeof(file->common.buffer),
+								   file->common.curOffset);
+	pgstat_report_wait_end();
+
+	if (file->common.nbytes < 0)
+	{
+		/* TODO The W32 specific code, see FileWrite. */
+
+		/* OK to retry if interrupted */
+		if (errno == EINTR)
+			goto retry;
+
+		return;
+	}
+	/* we choose not to advance offset here */
+}
+
+/*
+ * Write contents of a transient file buffer to disk.
+ */
+static void
+BufFileDumpBufferTransient(TransientBufFile *file)
+{
+	int			nwritten;
+
+	/* This function should only be needed during write access ... */
+	Assert(!file->common.readOnly);
+
+	/* ... and if there's some work to do. */
+	Assert(file->common.dirty);
+	Assert(file->common.nbytes > 0);
+
+retry:
+	errno = 0;
+	pgstat_report_wait_start(WAIT_EVENT_BUFFILE_WRITE);
+	nwritten = pg_pwrite(file->fd,
+						 file->common.buffer.data,
+						 file->common.nbytes,
+						 file->common.curOffset);
+	pgstat_report_wait_end();
+
+	/* if write didn't set errno, assume problem is no disk space */
+	if (nwritten != file->common.nbytes && errno == 0)
+		errno = ENOSPC;
+
+	if (nwritten < 0)
+	{
+		/* TODO The W32 specific code, see FileWrite. */
+
+		/* OK to retry if interrupted */
+		if (errno == EINTR)
+			goto retry;
+
+		return;					/* failed to write */
+	}
+
+	file->common.dirty = false;
+
+	file->common.pos = 0;
+	file->common.nbytes = 0;
+}
+
+/*
+ * Like BufFileRead() except it receives pointer to TransientBufFile.
+ */
+size_t
+BufFileReadTransient(TransientBufFile *file, void *ptr, size_t size)
+{
+	return BufFileReadCommon(&file->common, ptr, size, true);
+}
+
+/*
+ * Like BufFileWrite() except it receives pointer to TransientBufFile.
+ */
+size_t
+BufFileWriteTransient(TransientBufFile *file, void *ptr, size_t size)
+{
+	return BufFileWriteCommon(&file->common, ptr, size, true);
+}
+
+/*
+ * BufFileWriteCommon
+ *
+ * Functionality needed by both BufFileRead() and BufFileReadTransient().
+ */
+static size_t
+BufFileReadCommon(BufFileCommon *file, void *ptr, size_t size,
+				  bool is_transient)
+{
+	size_t		nread = 0;
+	size_t		nthistime;
+
+	if (file->dirty)
+	{
+		/*
+		 * Transient file currently does not allow both read and write access,
+		 * so this function should not see dirty buffer.
+		 */
+		Assert(!is_transient);
+
+		if (BufFileFlush((BufFile *) file) != 0)
+			return 0;			/* could not flush... */
+		Assert(!file->dirty);
+	}
+
+	while (size > 0)
+	{
+		if (file->pos >= file->nbytes)
+		{
+			/* Try to load more data into buffer. */
+			file->curOffset += file->pos;
+			file->pos = 0;
+			file->nbytes = 0;
+
+			if (!is_transient)
+				BufFileLoadBuffer((BufFile *) file);
+			else
+				BufFileLoadBufferTransient((TransientBufFile *) file);
+
+			if (file->nbytes <= 0)
+				break;			/* no more data available */
+		}
+
+		nthistime = file->nbytes - file->pos;
+		if (nthistime > size)
+			nthistime = size;
+		Assert(nthistime > 0);
+
+		memcpy(ptr, file->buffer.data + file->pos, nthistime);
+
+		file->pos += nthistime;
+		ptr = (void *) ((char *) ptr + nthistime);
+		size -= nthistime;
+		nread += nthistime;
+	}
+
+	return nread;
+}
+
+/*
+ * BufFileWriteCommon
+ *
+ * Functionality needed by both BufFileWrite() and BufFileWriteTransient().
+ */
+static size_t
+BufFileWriteCommon(BufFileCommon *file, void *ptr, size_t size,
+				   bool is_transient)
+{
+	size_t		nwritten = 0;
+	size_t		nthistime;
+
+	Assert(!file->readOnly);
+
+	while (size > 0)
+	{
+		if (file->pos >= BLCKSZ)
+		{
+			/* Buffer full, dump it out */
+			if (file->dirty)
+			{
+				if (!is_transient)
+					BufFileDumpBuffer((BufFile *) file);
+				else
+					BufFileDumpBufferTransient((TransientBufFile *) file);
+
+				if (file->dirty)
+					break;		/* I/O error */
+			}
+			else
+			{
+				Assert(!is_transient);
+
+				/* Hmm, went directly from reading to writing? */
+				file->curOffset += file->pos;
+				file->pos = 0;
+				file->nbytes = 0;
+			}
+		}
+
+		nthistime = BLCKSZ - file->pos;
+		if (nthistime > size)
+			nthistime = size;
+		Assert(nthistime > 0);
+
+		memcpy(file->buffer.data + file->pos, ptr, nthistime);
+
+		file->dirty = true;
+		file->pos += nthistime;
+		if (file->nbytes < file->pos)
+			file->nbytes = file->pos;
+		ptr = (void *) ((char *) ptr + nthistime);
+		size -= nthistime;
+		nwritten += nthistime;
+	}
+
+	return nwritten;
+}
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 0a3ad3a188..b923a182a4 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -912,8 +912,6 @@ typedef enum
 	WAIT_EVENT_RELATION_MAP_READ,
 	WAIT_EVENT_RELATION_MAP_SYNC,
 	WAIT_EVENT_RELATION_MAP_WRITE,
-	WAIT_EVENT_REORDER_BUFFER_READ,
-	WAIT_EVENT_REORDER_BUFFER_WRITE,
 	WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ,
 	WAIT_EVENT_REPLICATION_SLOT_READ,
 	WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC,
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 735e6d32b8..01fc91a8d0 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -387,10 +387,6 @@ struct ReorderBuffer
 	MemoryContext tup_context;
 
 	XLogRecPtr	current_restart_decoding_lsn;
-
-	/* buffer for disk<->memory conversions */
-	char	   *outbuf;
-	Size		outbufsize;
 };
 
 
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index 1fba404fe2..625fa0014c 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -28,9 +28,13 @@
 
 #include "storage/sharedfileset.h"
 
-/* BufFile is an opaque type whose details are not known outside buffile.c. */
+/*
+ * BufFile and TransientBufFile are opaque types whose details are not known
+ * outside buffile.c.
+ */
 
 typedef struct BufFile BufFile;
+typedef struct TransientBufFile TransientBufFile;
 
 /*
  * prototypes for functions in buffile.c
@@ -51,4 +55,11 @@ extern void BufFileExportShared(BufFile *file);
 extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name);
 extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name);
 
+extern TransientBufFile *BufFileOpenTransient(const char *path, int fileFlags);
+extern void BufFileCloseTransient(TransientBufFile *file);
+extern size_t BufFileReadTransient(TransientBufFile *file, void *ptr,
+					 size_t size);
+extern size_t BufFileWriteTransient(TransientBufFile *file, void *ptr,
+					  size_t size);
+
 #endif							/* BUFFILE_H */
-- 
2.13.7

