From df931f39efb9e7fd50108345ebeb53a098d0dc29 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@otacoo.com>
Date: Sun, 11 Oct 2015 20:46:40 +0900
Subject: [PATCH] Fix OOM error handling in COPY protocol of libpq

An OOM occurring while all the data needed by process from server has been
received can result in an infinite loop when parsing the output message.
getCopyStart is switched to discard a a message read from server in case of
server and any subsequent ones when receiving data from server for
PGASYNC_COPY_OUT, and not wait for any additional data when input is expected
via PGASYNC_COPY_IN. In the case of PGASYNC_COPY_BOTH, both concepts apply.
---
 .../libpqwalreceiver/libpqwalreceiver.c            |  1 +
 src/interfaces/libpq/fe-exec.c                     | 12 ++++
 src/interfaces/libpq/fe-protocol3.c                | 71 +++++++++++++++++-----
 3 files changed, 69 insertions(+), 15 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f670957..5e79b78 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -445,6 +445,7 @@ libpqrcv_PQexec(const char *query)
 		if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
 			PQresultStatus(lastResult) == PGRES_COPY_OUT ||
 			PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
+			PQresultStatus(lastResult) == PGRES_FATAL_ERROR ||
 			PQstatus(streamConn) == CONNECTION_BAD)
 			break;
 	}
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index 41937c0..c99f193 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1804,6 +1804,10 @@ getCopyResult(PGconn *conn, ExecStatusType copytype)
 		return pqPrepareAsyncResult(conn);
 	}
 
+	/* If error has occured, return a PGRES_FATAL_ERROR result */
+	if (conn->result && conn->result->resultStatus == PGRES_FATAL_ERROR)
+		return pqPrepareAsyncResult(conn);
+
 	/* If we have an async result for the COPY, return that */
 	if (conn->result && conn->result->resultStatus == copytype)
 		return pqPrepareAsyncResult(conn);
@@ -1994,6 +1998,9 @@ PQexecFinish(PGconn *conn)
 	 * We have to stop if we see copy in/out/both, however. We will resume
 	 * parsing after application performs the data transfer.
 	 *
+	 * Stop if we are in copy mode and error has occurred, the pending results
+	 * will be discarded during next execution in PQexecStart.
+	 *
 	 * Also stop if the connection is lost (else we'll loop infinitely).
 	 */
 	lastResult = NULL;
@@ -2023,6 +2030,11 @@ PQexecFinish(PGconn *conn)
 			result->resultStatus == PGRES_COPY_BOTH ||
 			conn->status == CONNECTION_BAD)
 			break;
+		else if ((conn->asyncStatus == PGASYNC_COPY_IN ||
+				  conn->asyncStatus == PGASYNC_COPY_OUT  ||
+				  conn->asyncStatus == PGASYNC_COPY_BOTH) &&
+				 result->resultStatus == PGRES_FATAL_ERROR)
+			break;
 	}
 
 	return lastResult;
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 43898a4..21a1d9b 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -49,7 +49,9 @@ static int	getParamDescriptions(PGconn *conn, int msgLength);
 static int	getAnotherTuple(PGconn *conn, int msgLength);
 static int	getParameterStatus(PGconn *conn);
 static int	getNotify(PGconn *conn);
-static int	getCopyStart(PGconn *conn, ExecStatusType copytype);
+static int	getCopyStart(PGconn *conn,
+						 ExecStatusType copytype,
+						 int msgLength);
 static int	getReadyForQuery(PGconn *conn);
 static void reportErrorPosition(PQExpBuffer msg, const char *query,
 					int loc, int encoding);
@@ -372,22 +374,25 @@ pqParseInput3(PGconn *conn)
 					}
 					break;
 				case 'G':		/* Start Copy In */
-					if (getCopyStart(conn, PGRES_COPY_IN))
+					if (getCopyStart(conn, PGRES_COPY_IN, msgLength))
 						return;
+					/* getCopyStart() moves inStart itself */
 					conn->asyncStatus = PGASYNC_COPY_IN;
-					break;
+					continue;
 				case 'H':		/* Start Copy Out */
-					if (getCopyStart(conn, PGRES_COPY_OUT))
+					if (getCopyStart(conn, PGRES_COPY_OUT, msgLength))
 						return;
+					/* getCopyStart() moves inStart itself */
 					conn->asyncStatus = PGASYNC_COPY_OUT;
 					conn->copy_already_done = 0;
-					break;
+					continue;
 				case 'W':		/* Start Copy Both */
-					if (getCopyStart(conn, PGRES_COPY_BOTH))
+					if (getCopyStart(conn, PGRES_COPY_BOTH, msgLength))
 						return;
+					/* getCopyStart() moves inStart itself */
 					conn->asyncStatus = PGASYNC_COPY_BOTH;
 					conn->copy_already_done = 0;
-					break;
+					continue;
 				case 'd':		/* Copy Data */
 
 					/*
@@ -1385,22 +1390,24 @@ getNotify(PGconn *conn)
  * parseInput already read the message type and length.
  */
 static int
-getCopyStart(PGconn *conn, ExecStatusType copytype)
+getCopyStart(PGconn *conn, ExecStatusType copytype, int msgLength)
 {
 	PGresult   *result;
 	int			nfields;
 	int			i;
+	const char *errmsg = NULL;
 
 	result = PQmakeEmptyPGresult(conn, copytype);
 	if (!result)
-		goto failure;
+		goto advance_and_error;
 
 	if (pqGetc(&conn->copy_is_binary, conn))
-		goto failure;
+		goto not_enough_data;
 	result->binary = conn->copy_is_binary;
+
 	/* the next two bytes are the number of fields	*/
-	if (pqGetInt(&(result->numAttributes), 2, conn))
-		goto failure;
+	if (pqGetInt(&result->numAttributes, 2, conn))
+		goto not_enough_data;
 	nfields = result->numAttributes;
 
 	/* allocate space for the attribute descriptors */
@@ -1409,7 +1416,7 @@ getCopyStart(PGconn *conn, ExecStatusType copytype)
 		result->attDescs = (PGresAttDesc *)
 			pqResultAlloc(result, nfields * sizeof(PGresAttDesc), TRUE);
 		if (!result->attDescs)
-			goto failure;
+			goto advance_and_error;
 		MemSet(result->attDescs, 0, nfields * sizeof(PGresAttDesc));
 	}
 
@@ -1418,7 +1425,7 @@ getCopyStart(PGconn *conn, ExecStatusType copytype)
 		int			format;
 
 		if (pqGetInt(&format, 2, conn))
-			goto failure;
+			goto not_enough_data;
 
 		/*
 		 * Since pqGetInt treats 2-byte integers as unsigned, we need to
@@ -1430,11 +1437,45 @@ getCopyStart(PGconn *conn, ExecStatusType copytype)
 
 	/* Success! */
 	conn->result = result;
+
+	/*
+	 * Advance inStart to show that the copy related message has been
+	 * processed.
+	 */
+	conn->inStart = conn->inCursor;
+
 	return 0;
 
-failure:
+not_enough_data:
 	PQclear(result);
 	return EOF;
+
+advance_and_error:
+	/* Discard unsaved result, if any */
+	if (result && result != conn->result)
+		PQclear(result);
+
+	/* Discard the failed message by pretending we read it */
+	conn->inStart += 5 + msgLength;
+
+	/*
+	 * Replace partially constructed result with an error result. First
+	 * discard the old result to try to win back some memory.
+	 */
+	pqClearAsyncResult(conn);
+
+	/*
+	 * If preceding code didn't provide an error message, assume "out of
+	 * memory" was meant.  The advantage of having this special case is that
+	 * freeing the old result first greatly improves the odds that gettext()
+	 * will succeed in providing a translation.
+	 */
+	if (!errmsg)
+		errmsg = libpq_gettext("out of memory");
+	printfPQExpBuffer(&conn->errorMessage, "%s\n", errmsg);
+	pqSaveErrorResult(conn);
+
+	return 0;
 }
 
 /*
-- 
2.7.2

