From 8c015e8a5a896d14ef4eab6801c0c670a1f99993 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
Date: Mon, 10 Oct 2016 11:14:13 +0300
Subject: [PATCH 2/2] Pause/resume support.

When a tape is "paused", we write the final block to disk, and release the
buffer, but still allow writing to it again later. This saves memory
in the initial run building phase, as we can release the buffer of all
tapes except the one we're currently writing to, which frees up the memory
for the quicksort. This is particularly important because we used to
reserve the memory for maxTapes, i.e. the number of tapes that we might
want to use, not the actual number of tapes used.
---
 src/backend/utils/sort/logtape.c   | 69 +++++++++++++++++++++++++++++++++-----
 src/backend/utils/sort/tuplesort.c | 22 +++++++-----
 src/include/utils/logtape.h        |  1 +
 3 files changed, 76 insertions(+), 16 deletions(-)

diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index 7e1f0d8..fa64115 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -112,6 +112,7 @@ typedef struct TapeBlockTrailer
 typedef struct LogicalTape
 {
 	bool		writing;		/* T while in write phase */
+	bool		paused;			/* T if the tape is paused */
 	bool		frozen;			/* T if blocks should not be freed when read */
 	bool		dirty;			/* does buffer need to be written? */
 
@@ -380,6 +381,7 @@ LogicalTapeSetCreate(int ntapes)
 		lt->writing = true;
 		lt->frozen = false;
 		lt->dirty = false;
+		lt->paused = false;
 		lt->firstBlockNumber = -1L;
 		lt->curBlockNumber = -1L;
 		lt->buffer = NULL;
@@ -447,6 +449,22 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 	{
 		lt->buffer = (char *) palloc(BLCKSZ);
 		lt->buffer_size = BLCKSZ;
+
+		/* if the tape was paused, resume it now. */
+		if (lt->paused)
+		{
+			ltsReadBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+
+			/* 'pos' and 'nbytes' should still be valid */
+			Assert(lt->nbytes == TapeBlockGetNBytes(lt->buffer));
+			lt->paused = false;
+		}
+		else
+		{
+			Assert(lt->curBlockNumber == -1);
+			lt->pos = 0;
+			lt->nbytes = 0;
+		}
 	}
 	if (lt->curBlockNumber == -1)
 	{
@@ -467,12 +485,6 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 			/* Buffer full, dump it out */
 			long		nextBlockNumber;
 
-			if (!lt->dirty)
-			{
-				/* Hmm, went directly from reading to writing? */
-				elog(ERROR, "invalid logtape state: should be dirty");
-			}
-
 			/*
 			 * First allocate the next block, so that we can store it
 			 * in the 'next' pointer of this block.
@@ -505,6 +517,36 @@ LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 }
 
 /*
+ * Pause writing to a tape.
+ *
+ * This writes the last partial data block to disk, and releases the memory
+ * allocated for the write buffer, but does not rewind it.  If you you call
+ * LogicalTapeWrite() on a paused tape, it will be un-paused implicitly,
+ * and the last partial block is read back into memory.
+ */
+void
+LogicalTapePause(LogicalTapeSet *lts, int tapenum)
+{
+	LogicalTape *lt;
+
+	Assert(tapenum >= 0 && tapenum < lts->nTapes);
+	lt = &lts->tapes[tapenum];
+
+	Assert(lt->writing);
+
+	/* Flush last partial data block. */
+	TapeBlockGetTrailer(lt->buffer)->next = lt->nbytes - BLCKSZ;
+	ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
+	lt->dirty = false;
+
+	pfree(lt->buffer);
+	lt->buffer = NULL;
+
+	lt->paused = true;
+	/* Note: leave 'pos' and 'nbytes' untouched */
+}
+
+/*
  * Rewind logical tape and switch from writing to reading or vice versa.
  *
  * Unless the tape has been "frozen" in read state, forWrite must be the
@@ -520,7 +562,17 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 
 	if (!forWrite)
 	{
-		if (lt->writing)
+		if (lt->paused)
+		{
+			/*
+			 * A paused tape is flushed to disk already, we just have to
+			 * change the state in memory to indicate that we're reading
+			 * it now, and allocate a buffer for the reading.
+			 */
+			Assert(lt->writing);
+			lt->paused = false;
+		}
+		else if (lt->writing)
 		{
 			/*
 			 * Completion of a write phase.  Flush last partial data block,
@@ -531,7 +583,6 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 				TapeBlockGetTrailer(lt->buffer)->next = lt->nbytes - BLCKSZ;
 				ltsWriteBlock(lts, lt->curBlockNumber, (void *) lt->buffer);
 			}
-			lt->writing = false;
 		}
 		else
 		{
@@ -541,6 +592,7 @@ LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
 			 */
 			Assert(lt->frozen);
 		}
+		lt->writing = false;
 
 		/* Allocate a read buffer (unless the tape is empty) */
 		if (lt->buffer)
@@ -659,6 +711,7 @@ LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
 		lt->writing = false;
 	}
 	lt->writing = false;
+	lt->paused = false;
 	lt->frozen = true;
 
 	/*
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index 7c9232c..c70ee8b 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -2355,15 +2355,15 @@ inittapes(Tuplesortstate *state)
 #endif
 
 	/*
-	 * Decrease availMem to reflect the space needed for tape buffers, when
-	 * writing the initial runs; but don't decrease it to the point that we
-	 * have no room for tuples.  (That case is only likely to occur if sorting
-	 * pass-by-value Datums; in all other scenarios the memtuples[] array is
-	 * unlikely to occupy more than half of allowedMem.  In the pass-by-value
-	 * case it's not important to account for tuple space, so we don't care if
-	 * LACKMEM becomes inaccurate.)
+	 * Decrease availMem to reflect the space needed for tape buffer of the
+	 * output tape, when writing the initial runs; but don't decrease it to the
+	 * point that we have no room for tuples.  (That case is only likely to
+	 * occur if sorting pass-by-value Datums; in all other scenarios the
+	 * memtuples[] array is unlikely to occupy more than half of allowedMem.
+	 * In the pass-by-value case it's not important to account for tuple space,
+	 * so we don't care if LACKMEM becomes inaccurate.)
 	 */
-	tapeSpace = (int64) maxTapes *TAPE_BUFFER_OVERHEAD;
+	tapeSpace = (int64) TAPE_BUFFER_OVERHEAD;
 
 	if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem)
 		USEMEM(state, tapeSpace);
@@ -2455,6 +2455,12 @@ selectnewtape(Tuplesortstate *state)
 	int			j;
 	int			a;
 
+	/*
+	 * Pause the old tape, to release the memory that was used for its buffer,
+	 * so that we can use it for building the next run.
+	 */
+	LogicalTapePause(state->tapeset, state->destTape);
+
 	/* Step D3: advance j (destTape) */
 	if (state->tp_dummy[state->destTape] < state->tp_dummy[state->destTape + 1])
 	{
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
index 92e8245..6e3e5be 100644
--- a/src/include/utils/logtape.h
+++ b/src/include/utils/logtape.h
@@ -32,6 +32,7 @@ extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
 extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
 				 void *ptr, size_t size);
 extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite);
+extern void LogicalTapePause(LogicalTapeSet *lts, int tapenum);
 extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
 extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
 					 size_t size);
-- 
2.9.3

