diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 9ce32fb39b..2a94f8f6b9 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -3061,6 +3061,30 @@ ExecStatusType PQresultStatus(const PGresult *res);
+
+
+ PGRES_BATCH_END
+
+
+ The PGresult represents the end of a batch.
+ This status occurs only when batch mode has been selected.
+
+
+
+
+
+ PGRES_BATCH_ABORTED
+
+
+ The PGresult represents a batch that's
+ received an error from the server. PQgetResult
+ must be called repeatedly, and it will return this status code,
+ until the end of the current batch, at which point it will return
+ PGRES_BATCH_END and normal processing can resume.
+
+
+
+
If the result status is PGRES_TUPLES_OK or
@@ -4819,6 +4843,482 @@ int PQflush(PGconn *conn);
+
+ Batch Mode
+
+
+ libpq
+ batch mode
+
+
+
+ pipelining
+ in libpq
+
+
+
+ libpq batch mode allows applications to
+ send a query without having to read the result of the previously
+ sent query. Taking advantage of the batch mode, a client will wait
+ less for the server, since multiple queries/results can be sent/
+ received in a single network transaction.
+
+
+
+ While batch mode provides a significant performance boost, writing
+ clients using the batch mode is more complex because it involves
+ managing a queue of pending queries and finding which result
+ corresponds to which query in the queue.
+
+
+
+ Using Batch Mode
+
+
+ To issue batches the application must switch a connection into batch mode.
+ Enter batch mode with
+ or test whether batch mode is active with
+ .
+ In batch mode, only asynchronous operations
+ are permitted, and COPY is not recommended as it
+ may trigger failure in batch processing. Using any synchronous
+ command execution functions such as PQfn,
+ PQexec or one of its sibling functions are error
+ conditions.
+
+
+
+
+ It is best to use batch mode with libpq in
+ non-blocking mode. If used
+ in blocking mode it is possible for a client/server deadlock to occur.
+
+
+ The client will block trying to send queries to the server, but the
+ server will block trying to send results to the client from queries
+ it has already processed. This only occurs when the client sends
+ enough queries to fill its output buffer and the server's receive
+ buffer before switching to processing input from the server,
+ but it's hard to predict exactly when that will happen.
+
+
+
+
+ Batch mode consumes more memory when send/receive is not done as required,
+ even in non-blocking mode.
+
+
+
+
+ Issuing Queries
+
+
+ After entering batch mode the application dispatches requests using
+ normal asynchronous libpq functions such as
+ PQsendQueryParams, PQsendPrepare,
+ PQsendQueryPrepared, PQsendDescribePortal,
+ PQsendDescribePrepared.
+ The asynchronous requests are followed by a
+
+ call to mark the end of the batch. The client needs not
+ call PQgetResult immediately after
+ dispatching each operation.
+ Result processing
+ is handled separately.
+
+
+
+ The server executes statements, and returns results, in the order the
+ client sends them. The server may begin executing the batch before all
+ commands in the batch are queued and the end of batch command is sent.
+ If any statement encounters an error the server aborts the current
+ transaction and skips processing the rest of the batch.
+ Query processing resumes after the end of the failed batch.
+
+
+
+ It's fine for one operation to depend on the results of a
+ prior one. One query may define a table that the next query in the same
+ batch uses; similarly, an application may create a named prepared statement
+ then execute it with later statements in the same batch.
+
+
+
+
+ Processing Results
+
+
+ To process the result of one batch query, the application calls
+ PQgetResult repeatedly and handles each result
+ until PQgetResult returns null.
+ The result from the next batch query may then be retrieved using
+ PQgetResult again and the cycle repeated.
+ The application handles individual statement results as normal.
+ When the results of all the queries in the batch have been
+ returned, PQgetResult returns a result
+ containing the status value PGRES_BATCH_END.
+
+
+
+ The client may choose to defer result processing until the complete
+ batch has been sent, or interleave that with sending further batch
+ queries; see .
+
+
+
+ To enter single-row mode, call PQsetSingleRowMode
+ before retrieving results with PQgetResult.
+ This mode selection is effective only for the query currently
+ being processed. For more information on the use of
+ PQsetSingleRowMode,
+ refer to .
+
+
+
+ PQgetResult behaves the same as for normal
+ asynchronous processing except that it may contain the new
+ PGresult types PGRES_BATCH_END
+ and PGRES_BATCH_ABORTED.
+ PGRES_BATCH_END is reported exactly once for each
+ PQbatchSendQueue call at the corresponding point in
+ the result stream and at no other time.
+ PGRES_BATCH_ABORTED is emitted during error handling;
+ see .
+
+
+
+ PQisBusy, PQconsumeInput, etc
+ operate as normal when processing batch results.
+
+
+
+ libpq does not provide any information to the
+ application about the query currently being processed (except that
+ PQgetResult returns null to indicate that we start
+ returning the results of next query). The application must keep track
+ of the order in which it sent queries and the expected results.
+ Applications will typically use a state machine or a FIFO queue for this.
+
+
+
+
+
+ Error Handling
+
+
+ When a query in a batch causes an ERROR the server
+ skips processing all subsequent messages until the end-of-batch message.
+ The open transaction is aborted.
+
+
+
+ From the client perspective, after the client gets a
+ PGRES_FATAL_ERROR return from
+ PQresultStatus the batch is flagged as aborted.
+ libpq will report
+ PGRES_BATCH_ABORTED result for each remaining queued
+ operation in an aborted batch. The result for
+ PQbatchSendQueue is reported as
+ PGRES_BATCH_END to signal the end of the aborted batch
+ and resumption of normal result processing.
+
+
+
+ The client must process results with
+ PQgetResult during error recovery.
+
+
+
+ If the batch used an implicit transaction then operations that have
+ already executed are rolled back and operations that were queued for after
+ the failed operation are skipped entirely. The same behaviour holds if the
+ batch starts and commits a single explicit transaction (i.e. the first
+ statement is BEGIN and the last is
+ COMMIT) except that the session remains in an aborted
+ transaction state at the end of the batch. If a batch contains
+ multiple explicit transactions, all transactions that committed
+ prior to the error remain committed, the currently in-progress transaction
+ is aborted and all subsequent operations in the current and all later
+ transactions in the same batch are skipped completely.
+
+
+
+
+ The client must not assume that work is committed when it
+ sends a COMMIT, only when the
+ corresponding result is received to confirm the commit is complete.
+ Because errors arrive asynchronously the application needs to be able to
+ restart from the last received committed change and
+ resend work done after that point if something goes wrong.
+
+
+
+
+
+ Interleaving Result Processing and Query Dispatch
+
+
+ To avoid deadlocks on large batches the client should be structured
+ around a non-blocking event loop using operating system facilities
+ such as select, poll,
+ WaitForMultipleObjectEx, etc.
+
+
+
+ The client application should generally maintain a queue of work
+ still to be dispatched and a queue of work that has been dispatched
+ but not yet had its results processed. When the socket is writable
+ it should dispatch more work. When the socket is readable it should
+ read results and process them, matching them up to the next entry in
+ its expected results queue. Based on available memory, results from
+ socket should be read frequently and there's no need to wait till the
+ batch end to read the results. Batches should be scoped to logical
+ units of work, usually (but not necessarily) one transaction per batch.
+ There's no need to exit batch mode and re-enter it between batches
+ or to wait for one batch to finish before sending the next.
+
+
+
+ An example using select() and a simple state
+ machine to track sent and received work is in
+ src/test/modules/test_libpq/testlibpqbatch.c
+ in the PostgreSQL source distribution.
+
+
+
+
+ Ending Batch Mode
+
+
+ Once all dispatched commands have had their results processed and
+ the end batch result has been consumed the application may return
+ to non-batched mode with .
+
+
+
+
+
+ Functions Associated with Batch Mode
+
+
+
+
+
+ PQbatchStatus
+
+ PQbatchStatus
+
+
+
+
+
+ Returns current batch mode status of the libpq
+ connection.
+
+int PQbatchStatus(const PGconn *conn);
+
+
+
+
+ PQbatchStatus can return one of the following values:
+
+
+
+
+ PQBATCH_MODE_ON
+
+
+
+ The libpq connection is in
+ batch mode.
+
+
+
+
+
+
+ PQBATCH_MODE_OFF
+
+
+
+ The libpq connection is
+ not in batch mode.
+
+
+
+
+
+
+ PQBATCH_MODE_ABORTED
+
+
+
+ The libpq connection is in aborted
+ batch status. The aborted flag is cleared as soon as the result
+ of the PQbatchSendQueue at the end of the aborted
+ batch is processed. Clients don't usually need this function to
+ verify aborted status, as they can tell that the batch is aborted
+ from the PGRES_BATCH_ABORTED result code.
+
+
+
+
+
+
+
+
+
+
+
+ PQenterBatchMode
+
+ PQenterBatchMode
+
+
+
+
+
+ Causes a connection to enter batch mode if it is currently idle or
+ already in batch mode.
+
+
+int PQenterBatchMode(PGconn *conn);
+
+
+
+
+ Returns 1 for success.
+ Returns 0 and has no effect if the connection is not currently
+ idle, i.e., it has a result ready, or it is waiting for more
+ input from the server, etc.
+ This function does not actually send anything to the server,
+ it just changes the libpq connection
+ state.
+
+
+
+
+
+
+ PQexitBatchMode
+
+ PQexitBatchMode
+
+
+
+
+
+ Causes a connection to exit batch mode if it is currently in batch mode
+ with an empty queue and no pending results.
+
+int PQexitBatchMode(PGconn *conn);
+
+
+
+ Returns 1 for success. Returns 1 and takes no action if not in
+ batch mode. If the current statement isn't finished processing
+ or there are results pending for collection with
+ PQgetResult, returns 0 and does nothing.
+
+
+
+
+
+
+ PQbatchSendQueue
+
+ PQbatchSendQueue
+
+
+
+
+
+ Delimits the end of a set of a batched commands by sending a
+ sync message
+ and flushing the send buffer. The end of a batch serves as
+ the delimiter of an implicit transaction and an error recovery
+ point; see .
+
+
+int PQbatchSendQueue(PGconn *conn);
+
+
+
+ Returns 1 for success. Returns 0 if the connection is not in
+ batch mode or sending a
+ sync message
+ is failed.
+
+
+
+
+
+
+
+ When to Use Batching
+
+
+ Much like asynchronous query mode, there is no performance disadvantage to
+ using batching and pipelining. It increases client application complexity
+ and extra caution is required to prevent client/server deadlocks, but
+ pipelining can sometimes offer considerable performance improvements.
+
+
+
+ Batching is most useful when the server is distant, i.e., network latency
+ (ping time) is high, and also when many small operations
+ are being performed in rapid sequence. There is usually less benefit
+ in using batches when each query takes many multiples of the client/server
+ round-trip time to execute. A 100-statement operation run on a server
+ 300ms round-trip-time away would take 30 seconds in network latency alone
+ without batching; with batching it may spend as little as 0.3s waiting for
+ results from the server.
+
+
+
+ Use batches when your application does lots of small
+ INSERT, UPDATE and
+ DELETE operations that can't easily be transformed
+ into operations on sets or into a COPY operation.
+
+
+
+ Batching is not useful when information from one operation is required by
+ the client to produce the next operation. In such cases, the client
+ must introduce a synchronization point and wait for a full client/server
+ round-trip to get the results it needs. However, it's often possible to
+ adjust the client design to exchange the required information server-side.
+ Read-modify-write cycles are especially good candidates; for example:
+
+BEGIN;
+SELECT x FROM mytable WHERE id = 42 FOR UPDATE;
+-- result: x=2
+-- client adds 1 to x:
+UPDATE mytable SET x = 3 WHERE id = 42;
+COMMIT;
+
+ could be much more efficiently done with:
+
+UPDATE mytable SET x = x + 1 WHERE id = 42;
+
+
+
+
+ Batching is less useful, and more complex, when a single batch contains
+ multiple transactions (see ).
+
+
+
+
+ The batch API was introduced in PostgreSQL 14.0, but clients using
+ the PostgreSQL 14 version of libpq can use
+ batches on server versions 7.4 and newer. Batching works on any server
+ that supports the v3 extended query protocol.
+
+
+
+
+
Retrieving Query Results Row-by-Row
@@ -4859,6 +5359,17 @@ int PQflush(PGconn *conn);
Each object should be freed with as usual.
+
+
+
+ When using batch mode, activate the single-row mode on the current
+ batch query by calling PQsetSingleRowMode
+ before retrieving results with PQgetResult.
+ See for more information.
+
+
+
diff --git a/doc/src/sgml/lobj.sgml b/doc/src/sgml/lobj.sgml
index 6329cf0796..49be8b1dbe 100644
--- a/doc/src/sgml/lobj.sgml
+++ b/doc/src/sgml/lobj.sgml
@@ -130,6 +130,10 @@
libpq library.
+
+ Client applications cannot use these functions while libpq connection is in batch mode.
+
+
Creating a Large Object
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 7180fedd65..921e5dccd9 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -1058,6 +1058,20 @@ pgbench optionsd
+
+ \beginbatch
+ \endbatch
+
+
+
+ These commands delimit the start and end of a batch of SQL statements.
+ In a batch, statements are sent to server without waiting for the results
+ of previous statements (see ).
+ Batching requires the extended query protocol.
+
+
+
+
\gset [prefix]
@@ -1086,6 +1100,12 @@ pgbench optionsd
row, the last value is kept.
+
+ \gset and \aset cannot be used
+ inside a batch section, since query results are not immediately
+ fetched in this mode.
+
+
The following example puts the final account balance from the first query
into variable abalance, and fills variables
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 24f8b3e42e..9ae67387a5 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -1026,6 +1026,9 @@ libpqrcv_exec(WalReceiverConn *conn, const char *query,
walres->status = WALRCV_ERROR;
walres->err = pchomp(PQerrorMessage(conn->streamConn));
break;
+ default:
+ /* This is just to keep compiler quiet */
+ break;
}
PQclear(pgres);
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 3057665bbe..5846e9153e 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -351,7 +351,8 @@ typedef enum
*
* CSTATE_START_COMMAND starts the execution of a command. On a SQL
* command, the command is sent to the server, and we move to
- * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
+ * CSTATE_WAIT_RESULT state unless in batch mode.
+ * On a \sleep meta-command, the timer is set,
* and we enter the CSTATE_SLEEP state to wait for it to expire. Other
* meta-commands are executed immediately. If the command about to start
* is actually beyond the end of the script, advance to CSTATE_END_TX.
@@ -485,7 +486,9 @@ typedef enum MetaCommand
META_IF, /* \if */
META_ELIF, /* \elif */
META_ELSE, /* \else */
- META_ENDIF /* \endif */
+ META_ENDIF, /* \endif */
+ META_BEGINBATCH, /* \beginbatch */
+ META_ENDBATCH /* \endbatch */
} MetaCommand;
typedef enum QueryMode
@@ -2492,6 +2495,10 @@ getMetaCommand(const char *cmd)
mc = META_GSET;
else if (pg_strcasecmp(cmd, "aset") == 0)
mc = META_ASET;
+ else if (pg_strcasecmp(cmd, "beginbatch") == 0)
+ mc = META_BEGINBATCH;
+ else if (pg_strcasecmp(cmd, "endbatch") == 0)
+ mc = META_ENDBATCH;
else
mc = META_NONE;
return mc;
@@ -2681,11 +2688,24 @@ sendCommand(CState *st, Command *command)
if (commands[j]->type != SQL_COMMAND)
continue;
preparedStatementName(name, st->use_file, j);
- res = PQprepare(st->con, name,
- commands[j]->argv[0], commands[j]->argc - 1, NULL);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pg_log_error("%s", PQerrorMessage(st->con));
- PQclear(res);
+ if (PQbatchStatus(st->con) == PQBATCH_MODE_OFF)
+ {
+ res = PQprepare(st->con, name,
+ commands[j]->argv[0], commands[j]->argc - 1, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_log_error("%s", PQerrorMessage(st->con));
+ PQclear(res);
+ }
+ else
+ {
+ /*
+ * In batch mode, we use asynchronous functions. If a server-side
+ * error occurs, it will be processed later among the other results.
+ */
+ if (!PQsendPrepare(st->con, name,
+ commands[j]->argv[0], commands[j]->argc - 1, NULL))
+ pg_log_error("%s", PQerrorMessage(st->con));
+ }
}
st->prepared[st->use_file] = true;
}
@@ -2799,6 +2819,12 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
/* otherwise the result is simply thrown away by PQclear below */
break;
+ case PGRES_BATCH_END:
+ pg_log_debug("client %d batch ending", st->id);
+ if (PQexitBatchMode(st->con) != 1)
+ pg_log_error("client %d failed to exit batch mode", st->id);
+ break;
+
default:
/* anything else is unexpected */
pg_log_error("client %d script %d aborted in command %d query %d: %s",
@@ -3057,13 +3083,27 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
/* Execute the command */
if (command->type == SQL_COMMAND)
{
+ if ((command->meta == META_GSET || command->meta == META_ASET)
+ && PQbatchStatus(st->con) != PQBATCH_MODE_OFF)
+ {
+ commandFailed(st, "SQL", "\\gset and \\aset are not allowed in a batch section");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+
if (!sendCommand(st, command))
{
commandFailed(st, "SQL", "SQL command send failed");
st->state = CSTATE_ABORTED;
}
else
- st->state = CSTATE_WAIT_RESULT;
+ {
+ /* Wait for results, unless in batch mode */
+ if (PQbatchStatus(st->con) == PQBATCH_MODE_OFF)
+ st->state = CSTATE_WAIT_RESULT;
+ else
+ st->state = CSTATE_END_COMMAND;
+ }
}
else if (command->type == META_COMMAND)
{
@@ -3184,6 +3224,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
}
break;
+
/*
* Wait for the current SQL command to complete
*/
@@ -3203,7 +3244,14 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
if (readCommandResponse(st,
sql_script[st->use_file].commands[st->command]->meta,
sql_script[st->use_file].commands[st->command]->varprefix))
- st->state = CSTATE_END_COMMAND;
+ {
+ /*
+ * outside of batch mode: stop reading results.
+ * batch mode: continue reading results until an end of batch response.
+ */
+ if (PQbatchStatus(st->con) != PQBATCH_MODE_ON)
+ st->state = CSTATE_END_COMMAND;
+ }
else
st->state = CSTATE_ABORTED;
break;
@@ -3447,6 +3495,45 @@ executeMetaCommand(CState *st, instr_time *now)
return CSTATE_ABORTED;
}
}
+ else if (command->meta == META_BEGINBATCH)
+ {
+ /*
+ * In batch mode, we use a workflow based on libpq batch
+ * functions.
+ */
+ if (querymode == QUERY_SIMPLE)
+ {
+ commandFailed(st, "beginbatch", "cannot use batch mode with the simple query protocol");
+ st->state = CSTATE_ABORTED;
+ return CSTATE_ABORTED;
+ }
+
+ if (PQbatchStatus(st->con) != PQBATCH_MODE_OFF)
+ {
+ commandFailed(st, "beginbatch", "already in batch mode");
+ return CSTATE_ABORTED;
+ }
+ if (PQenterBatchMode(st->con) == 0)
+ {
+ commandFailed(st, "beginbatch", "failed to start a batch");
+ return CSTATE_ABORTED;
+ }
+ }
+ else if (command->meta == META_ENDBATCH)
+ {
+ if (PQbatchStatus(st->con) != PQBATCH_MODE_ON)
+ {
+ commandFailed(st, "endbatch", "not in batch mode");
+ return CSTATE_ABORTED;
+ }
+ if (!PQbatchSendQueue(st->con))
+ {
+ commandFailed(st, "endbatch", "failed to end the batch");
+ return CSTATE_ABORTED;
+ }
+ /* collect pending results before getting out of batch mode */
+ return CSTATE_WAIT_RESULT;
+ }
/*
* executing the expression or shell command might have taken a
@@ -4686,6 +4773,12 @@ process_backslash_command(PsqlScanState sstate, const char *source)
syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
"too many arguments", NULL, -1);
}
+ else if (my_command->meta == META_BEGINBATCH || my_command->meta == META_ENDBATCH)
+ {
+ if (my_command->argc != 1)
+ syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
+ "unexpected argument", NULL, -1);
+ }
else
{
/* my_command->meta == META_NONE */
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index bbc1f90481..ca86f55652 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -179,3 +179,7 @@ PQgetgssctx 176
PQsetSSLKeyPassHook_OpenSSL 177
PQgetSSLKeyPassHook_OpenSSL 178
PQdefaultSSLKeyPassHook_OpenSSL 179
+PQenterBatchMode 180
+PQexitBatchMode 181
+PQbatchSendQueue 182
+PQbatchStatus 183
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index e7781d010f..c673bc405f 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -536,6 +536,23 @@ pqDropConnection(PGconn *conn, bool flushInput)
}
}
+/*
+ * pqFreeCommandQueue
+ * Free all the entries of PGcommandQueueEntry queue passed.
+ */
+static void
+pqFreeCommandQueue(PGcommandQueueEntry *queue)
+{
+ while (queue != NULL)
+ {
+ PGcommandQueueEntry *cur = queue;
+
+ queue = cur->next;
+ if (cur->query)
+ free(cur->query);
+ free(cur);
+ }
+}
/*
* pqDropServerData
@@ -555,6 +572,7 @@ pqDropServerData(PGconn *conn)
{
PGnotify *notify;
pgParameterStatus *pstatus;
+ PGcommandQueueEntry *queue;
/* Forget pending notifies */
notify = conn->notifyHead;
@@ -567,6 +585,14 @@ pqDropServerData(PGconn *conn)
}
conn->notifyHead = conn->notifyTail = NULL;
+ queue = conn->cmd_queue_head;
+ pqFreeCommandQueue(queue);
+ conn->cmd_queue_head = conn->cmd_queue_tail = NULL;
+
+ queue = conn->cmd_queue_recycle;
+ pqFreeCommandQueue(queue);
+ conn->cmd_queue_recycle = NULL;
+
/* Reset ParameterStatus data, as well as variables deduced from it */
pstatus = conn->pstatus;
while (pstatus != NULL)
@@ -6699,6 +6725,15 @@ PQbackendPID(const PGconn *conn)
return conn->be_pid;
}
+int
+PQbatchStatus(const PGconn *conn)
+{
+ if (!conn)
+ return false;
+
+ return conn->batch_status;
+}
+
int
PQconnectionNeedsPassword(const PGconn *conn)
{
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index eea0237c3a..93971857d5 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -39,7 +39,9 @@ char *const pgresStatus[] = {
"PGRES_NONFATAL_ERROR",
"PGRES_FATAL_ERROR",
"PGRES_COPY_BOTH",
- "PGRES_SINGLE_TUPLE"
+ "PGRES_SINGLE_TUPLE",
+ "PGRES_BATCH_END",
+ "PGRES_BATCH_ABORTED"
};
/*
@@ -70,6 +72,11 @@ static PGresult *PQexecFinish(PGconn *conn);
static int PQsendDescribe(PGconn *conn, char desc_type,
const char *desc_target);
static int check_field_number(const PGresult *res, int field_num);
+static PGcommandQueueEntry *pqMakePipelineCmd(PGconn *conn);
+static void pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry);
+static void pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry);
+static int pqBatchProcessQueue(PGconn *conn);
+static int pqBatchFlush(PGconn *conn);
/* ----------------
@@ -1210,7 +1217,7 @@ pqRowProcessor(PGconn *conn, const char **errmsgp)
conn->next_result = conn->result;
conn->result = res;
/* And mark the result ready to return */
- conn->asyncStatus = PGASYNC_READY;
+ conn->asyncStatus = PGASYNC_READY_MORE;
}
return 1;
@@ -1233,6 +1240,13 @@ fail:
int
PQsendQuery(PGconn *conn, const char *query)
{
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("cannot PQsendQuery in batch mode, use PQsendQueryParams\n"));
+ return 0;
+ }
+
if (!PQsendQueryStart(conn))
return 0;
@@ -1263,10 +1277,11 @@ PQsendQuery(PGconn *conn, const char *query)
conn->last_query = strdup(query);
/*
- * Give the data a push. In nonblock mode, don't complain if we're unable
- * to send it all; PQgetResult() will do any additional flushing needed.
+ * Give the data a push (in batch mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
+ if (pqBatchFlush(conn) < 0)
{
/* error message should be set up already */
return 0;
@@ -1331,6 +1346,10 @@ PQsendPrepare(PGconn *conn,
const char *stmtName, const char *query,
int nParams, const Oid *paramTypes)
{
+ PGcommandQueueEntry *pipeCmd = NULL;
+ char **last_query;
+ PGQueryClass *queryclass;
+
if (!PQsendQueryStart(conn))
return 0;
@@ -1362,6 +1381,15 @@ PQsendPrepare(PGconn *conn,
return 0;
}
+ /* Alloc batch memory before doing anything */
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ pipeCmd = pqMakePipelineCmd(conn);
+
+ if (pipeCmd == NULL)
+ return 0; /* error msg already set */
+ }
+
/* construct the Parse message */
if (pqPutMsgStart('P', false, conn) < 0 ||
pqPuts(stmtName, conn) < 0 ||
@@ -1389,31 +1417,47 @@ PQsendPrepare(PGconn *conn,
goto sendFailed;
/* construct the Sync message */
- if (pqPutMsgStart('S', false, conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
+ if (conn->batch_status == PQBATCH_MODE_OFF)
+ {
+ if (pqPutMsgStart('S', false, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+
+ last_query = &conn->last_query;
+ queryclass = &conn->queryclass;
+ }
+ else
+ {
+ last_query = &pipeCmd->query;
+ queryclass = &pipeCmd->queryclass;
+ }
/* remember we are doing just a Parse */
- conn->queryclass = PGQUERY_PREPARE;
+ *queryclass = PGQUERY_PREPARE;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
- conn->last_query = strdup(query);
+ if (*last_query)
+ free(*last_query);
+ *last_query = strdup(query);
/*
- * Give the data a push. In nonblock mode, don't complain if we're unable
- * to send it all; PQgetResult() will do any additional flushing needed.
+ * Give the data a push (in batch mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
+ if (pqBatchFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ pqAppendPipelineCmd(conn, pipeCmd);
+ else
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ pqRecyclePipelineCmd(conn, pipeCmd);
/* error message should be set up already */
return 0;
}
@@ -1461,7 +1505,8 @@ PQsendQueryPrepared(PGconn *conn,
}
/*
- * Common startup code for PQsendQuery and sibling routines
+ * PQsendQueryStart
+ * Common startup code for PQsendQuery and sibling routines
*/
static bool
PQsendQueryStart(PGconn *conn)
@@ -1479,20 +1524,62 @@ PQsendQueryStart(PGconn *conn)
libpq_gettext("no connection to the server\n"));
return false;
}
- /* Can't send while already busy, either. */
- if (conn->asyncStatus != PGASYNC_IDLE)
+
+ /* Can't send while already busy, either, unless enqueuing for later */
+ if (conn->asyncStatus != PGASYNC_IDLE &&
+ conn->batch_status == PQBATCH_MODE_OFF)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("another command is already in progress\n"));
return false;
}
- /* initialize async result-accumulation state */
- pqClearAsyncResult(conn);
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ /*
+ * When enqueuing a message we don't change much of the connection
+ * state since it's already in use for the current command. The
+ * connection state will get updated when PQbatchQueueProcess(...)
+ * advances to start processing the queued message.
+ *
+ * Just make sure we can safely enqueue given the current connection
+ * state. We can enqueue behind another queue item, or behind a
+ * non-queue command (one that sends its own sync), but we can't
+ * enqueue if the connection is in a copy state.
+ */
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_QUEUED:
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ /* ok to queue */
+ break;
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("cannot queue commands during COPY\n"));
+ return false;
+ break;
+ case PGASYNC_IDLE:
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("internal error, idle state in batch mode"));
+ break;
+ }
+ }
+ else
+ {
+ /*
+ * This command's results will come in immediately. Initialize async
+ * result-accumulation state
+ */
+ pqClearAsyncResult(conn);
- /* reset single-row processing mode */
- conn->singleRowMode = false;
+ /* reset single-row processing mode */
+ conn->singleRowMode = false;
+ }
/* ready to send command message */
return true;
}
@@ -1516,6 +1603,10 @@ PQsendQueryGuts(PGconn *conn,
int resultFormat)
{
int i;
+ PGcommandQueueEntry *pipeCmd = NULL;
+ char **last_query;
+ PGQueryClass *queryclass;
+
/* This isn't gonna work on a 2.0 server */
if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
@@ -1525,6 +1616,23 @@ PQsendQueryGuts(PGconn *conn,
return 0;
}
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ pipeCmd = pqMakePipelineCmd(conn);
+
+ if (pipeCmd == NULL)
+ return 0; /* error msg already set */
+
+ last_query = &pipeCmd->query;
+ queryclass = &pipeCmd->queryclass;
+ }
+ else
+ {
+ last_query = &conn->last_query;
+ queryclass = &conn->queryclass;
+ }
+
+
/*
* We will send Parse (if needed), Bind, Describe Portal, Execute, Sync,
* using specified statement name and the unnamed portal.
@@ -1637,35 +1745,43 @@ PQsendQueryGuts(PGconn *conn,
pqPutMsgEnd(conn) < 0)
goto sendFailed;
- /* construct the Sync message */
- if (pqPutMsgStart('S', false, conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
+ if (conn->batch_status == PQBATCH_MODE_OFF)
+ {
+ /* construct the Sync message */
+ if (pqPutMsgStart('S', false, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ }
/* remember we are using extended query protocol */
- conn->queryclass = PGQUERY_EXTENDED;
+ *queryclass = PGQUERY_EXTENDED;
/* and remember the query text too, if possible */
/* if insufficient memory, last_query just winds up NULL */
- if (conn->last_query)
- free(conn->last_query);
+ if (*last_query)
+ free(*last_query);
if (command)
- conn->last_query = strdup(command);
+ *last_query = strdup(command);
else
- conn->last_query = NULL;
+ *last_query = NULL;
/*
- * Give the data a push. In nonblock mode, don't complain if we're unable
- * to send it all; PQgetResult() will do any additional flushing needed.
+ * Give the data a push (in batch mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
+ if (pqBatchFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ pqAppendPipelineCmd(conn, pipeCmd);
+ else
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ pqRecyclePipelineCmd(conn, pipeCmd);
/* error message should be set up already */
return 0;
}
@@ -1766,14 +1882,17 @@ PQisBusy(PGconn *conn)
return conn->asyncStatus == PGASYNC_BUSY || conn->write_failed;
}
-
/*
* PQgetResult
* Get the next PGresult produced by a query. Returns NULL if no
* query work remains or an error has occurred (e.g. out of
* memory).
+ *
+ * In batch mode, once all the result of a query have been returned,
+ * PQgetResult returns NULL to let the user know that the next batched
+ * query is being processed. At the end of the batch, returns a
+ * end-of-batch result with PQresultStatus(result) == PGRES_BATCH_END.
*/
-
PGresult *
PQgetResult(PGconn *conn)
{
@@ -1842,9 +1961,38 @@ PQgetResult(PGconn *conn)
switch (conn->asyncStatus)
{
case PGASYNC_IDLE:
+ case PGASYNC_QUEUED:
res = NULL; /* query is complete */
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ /*
+ * In batch mode, we prepare the processing of the results of
+ * the next query.
+ */
+ pqBatchProcessQueue(conn);
+ }
break;
case PGASYNC_READY:
+ res = pqPrepareAsyncResult(conn);
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ /*
+ * In batch mode, query execution state cannot be IDLE as
+ * there can be other queries or results waiting in the queue
+ *
+ * The connection isn't idle since we can't submit new
+ * nonbatched commands. It isn't also busy since the current
+ * command is done and we need to process a new one.
+ */
+ conn->asyncStatus = PGASYNC_QUEUED;
+ }
+ else
+ {
+ /* Set the state back to BUSY, allowing parsing to proceed. */
+ conn->asyncStatus = PGASYNC_BUSY;
+ }
+ break;
+ case PGASYNC_READY_MORE:
res = pqPrepareAsyncResult(conn);
/* Set the state back to BUSY, allowing parsing to proceed. */
conn->asyncStatus = PGASYNC_BUSY;
@@ -2025,6 +2173,13 @@ PQexecStart(PGconn *conn)
if (!conn)
return false;
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("Synchronous command execution functions are not allowed in batch mode\n"));
+ return false;
+ }
+
/*
* Silently discard any prior query result that application didn't eat.
* This is probably poor design, but it's here for backward compatibility.
@@ -2219,6 +2374,9 @@ PQsendDescribePortal(PGconn *conn, const char *portal)
static int
PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
{
+ PGcommandQueueEntry *pipeCmd = NULL;
+ PGQueryClass *queryclass;
+
/* Treat null desc_target as empty string */
if (!desc_target)
desc_target = "";
@@ -2234,6 +2392,18 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
return 0;
}
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ pipeCmd = pqMakePipelineCmd(conn);
+
+ if (pipeCmd == NULL)
+ return 0; /* error msg already set */
+
+ queryclass = &pipeCmd->queryclass;
+ }
+ else
+ queryclass = &conn->queryclass;
+
/* construct the Describe message */
if (pqPutMsgStart('D', false, conn) < 0 ||
pqPutc(desc_type, conn) < 0 ||
@@ -2242,32 +2412,40 @@ PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target)
goto sendFailed;
/* construct the Sync message */
- if (pqPutMsgStart('S', false, conn) < 0 ||
- pqPutMsgEnd(conn) < 0)
- goto sendFailed;
+ if (conn->batch_status == PQBATCH_MODE_OFF)
+ {
+ if (pqPutMsgStart('S', false, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+ }
/* remember we are doing a Describe */
- conn->queryclass = PGQUERY_DESCRIBE;
+ *queryclass = PGQUERY_DESCRIBE;
- /* reset last_query string (not relevant now) */
- if (conn->last_query)
+ /* reset last-query string (not relevant now) */
+ if (conn->last_query && conn->batch_status != PQBATCH_MODE_OFF)
{
free(conn->last_query);
conn->last_query = NULL;
}
/*
- * Give the data a push. In nonblock mode, don't complain if we're unable
- * to send it all; PQgetResult() will do any additional flushing needed.
+ * Give the data a push (in batch mode, only if we're past the size
+ * threshold). In nonblock mode, don't complain if we're unable to send
+ * it all; PQgetResult() will do any additional flushing needed.
*/
- if (pqFlush(conn) < 0)
+ if (pqBatchFlush(conn) < 0)
goto sendFailed;
/* OK, it's launched! */
- conn->asyncStatus = PGASYNC_BUSY;
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ pqAppendPipelineCmd(conn, pipeCmd);
+ else
+ conn->asyncStatus = PGASYNC_BUSY;
return 1;
sendFailed:
+ pqRecyclePipelineCmd(conn, pipeCmd);
/* error message should be set up already */
return 0;
}
@@ -2665,6 +2843,13 @@ PQfn(PGconn *conn,
/* clear the error string */
resetPQExpBuffer(&conn->errorMessage);
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("PQfn not allowed in batch mode\n"));
+ return NULL;
+ }
+
if (conn->sock == PGINVALID_SOCKET || conn->asyncStatus != PGASYNC_IDLE ||
conn->result != NULL)
{
@@ -2685,6 +2870,377 @@ PQfn(PGconn *conn,
args, nargs);
}
+/* ====== Batch mode support ======== */
+
+/*
+ * PQenterBatchMode
+ * Put an idle connection in batch mode.
+ *
+ * Returns 1 on success. On failure, errorMessage is set and 0 is returned.
+ *
+ * Commands submitted after this can be pipelined on the connection;
+ * there's no requirement to wait for one to finish before the next is
+ * dispatched.
+ *
+ * Queuing of a new query or syncing during COPY is not allowed.
+ *
+ * A set of commands is terminated by a PQbatchQueueSync. Multiple sets of
+ * batched commands may be sent while in batch mode. Batch mode can be exited
+ * by calling PQexitBatchMode() once all results are processed.
+ *
+ * This doesn't actually send anything on the wire, it just puts libpq
+ * into a state where it can pipeline work.
+ */
+int
+PQenterBatchMode(PGconn *conn)
+{
+ if (!conn)
+ return 0;
+
+ /* succeed with no action if already in batch mode */
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ return 1;
+
+ if (conn->asyncStatus != PGASYNC_IDLE)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("cannot enter batch mode, connection not idle\n"));
+ return 0;
+ }
+
+ conn->batch_status = PQBATCH_MODE_ON;
+ conn->asyncStatus = PGASYNC_QUEUED;
+
+ return 1;
+}
+
+/*
+ * PQexitBatchMode
+ * End batch mode and return to normal command mode.
+ *
+ * Returns 1 in success (batch mode was ended, or not in batch mode).
+ *
+ * Returns 0 if in batch mode and cannot be ended yet.
+ * Error message will be set.
+ */
+int
+PQexitBatchMode(PGconn *conn)
+{
+ if (!conn)
+ return 0;
+
+ if (conn->batch_status == PQBATCH_MODE_OFF)
+ return 1;
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ /* there are some uncollected results */
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("cannot exit batch mode with uncollected results\n"));
+ return 0;
+
+ case PGASYNC_BUSY:
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("cannot exit batch mode while busy\n"));
+ return 0;
+
+ default:
+ /* OK */
+ break;
+ }
+
+ /* still work to process */
+ if (conn->cmd_queue_head != NULL)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("command queue not clean"));
+ return 0;
+ }
+
+ conn->batch_status = PQBATCH_MODE_OFF;
+ conn->asyncStatus = PGASYNC_IDLE;
+
+ /* Flush any pending data in out buffer */
+ if (pqFlush(conn) < 0)
+ return 0; /* error message is setup already */
+ return 1;
+}
+
+/*
+ * internal function pqBatchProcessQueue
+ *
+ * In batch mode, start processing the next query in the queue.
+ *
+ * Returns 1 if the next query was popped from the queue and can
+ * be processed by PQconsumeInput, PQgetResult, etc.
+ *
+ * Returns 0 if the current query isn't done yet, the connection
+ * is not in a batch, or there are no more queries to process.
+ */
+static int
+pqBatchProcessQueue(PGconn *conn)
+{
+ PGcommandQueueEntry *next_query;
+
+ if (!conn)
+ return 0;
+
+ if (conn->batch_status == PQBATCH_MODE_OFF)
+ return 0;
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ /* should be unreachable */
+ printfPQExpBuffer(&conn->errorMessage,
+ "internal error, COPY in batch mode");
+ break;
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ /* client still has to process current query or results */
+ return 0;
+ break;
+ case PGASYNC_IDLE:
+ /* should be unreachable */
+ printfPQExpBuffer(&conn->errorMessage,
+ "internal error, IDLE in batch mode");
+ break;
+ case PGASYNC_QUEUED:
+ /* next query please */
+ break;
+ }
+
+ if (conn->cmd_queue_head == NULL)
+ {
+ /*
+ * In batch mode but nothing left on the queue; caller can submit more
+ * work or PQexitBatchMode() now.
+ */
+ return 0;
+ }
+
+ /*
+ * Pop the next query from the queue and set up the connection state as if
+ * it'd just been dispatched from a non-batched call
+ */
+ next_query = conn->cmd_queue_head;
+ conn->cmd_queue_head = next_query->next;
+ next_query->next = NULL;
+
+ /* Initialize async result-accumulation state */
+ pqClearAsyncResult(conn);
+
+ /* reset single-row processing mode XXX why?? */
+ conn->singleRowMode = false;
+
+
+ conn->last_query = next_query->query;
+ next_query->query = NULL;
+ conn->queryclass = next_query->queryclass;
+
+ pqRecyclePipelineCmd(conn, next_query);
+
+ if (conn->batch_status == PQBATCH_MODE_ABORTED &&
+ conn->queryclass != PGQUERY_SYNC)
+ {
+ /*
+ * In an aborted batch we don't get anything from the server for each
+ * result; we're just discarding input until we get to the next sync
+ * from the server. The client needs to know its queries got aborted
+ * so we create a fake PGresult to return immediately from
+ * PQgetResult.
+ */
+ conn->result =
+ PQmakeEmptyPGresult(conn, PGRES_BATCH_ABORTED);
+ if (!conn->result)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("out of memory"));
+ pqSaveErrorResult(conn);
+ return 0;
+ }
+ conn->asyncStatus = PGASYNC_READY;
+ }
+ else
+ {
+ /* allow parsing to continue */
+ conn->asyncStatus = PGASYNC_BUSY;
+ }
+
+ return 1;
+}
+
+/*
+ * PQbatchSendQueue
+ * End a batch submission.
+ *
+ * It's legal to start submitting another batch immediately, without
+ * waiting for the results of the current batch. There's no need to end batch
+ * mode and start it again.
+ *
+ * If a command in a batch fails, every subsequent command up to and
+ * including the PQbatchQueueSync command result gets set to PGRES_BATCH_ABORTED
+ * state. If the whole batch is processed without error, a PGresult with
+ * PGRES_BATCH_END is produced.
+ *
+ * Queries can already have been sent before PQbatchSendQueue is called, but
+ * PQbatchSendQueue need to be called before retrieving command results.
+ *
+ * The connection will remain in batch mode and unavailable for new synchronous
+ * command execution functions until all results from the batch are processed
+ * by the client.
+ */
+int
+PQbatchSendQueue(PGconn *conn)
+{
+ PGcommandQueueEntry *entry;
+
+ if (!conn)
+ return 0;
+
+ if (conn->batch_status == PQBATCH_MODE_OFF)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("cannot send batch when not in batch mode\n"));
+ return 0;
+ }
+
+ switch (conn->asyncStatus)
+ {
+ case PGASYNC_IDLE:
+ /* should be unreachable */
+ printfPQExpBuffer(&conn->errorMessage,
+ "internal error: cannot send batch when idle\n");
+ return 0;
+ break;
+ case PGASYNC_COPY_IN:
+ case PGASYNC_COPY_OUT:
+ case PGASYNC_COPY_BOTH:
+ /* should be unreachable */
+ printfPQExpBuffer(&conn->errorMessage,
+ "internal error: cannot send batch while in COPY\n");
+ return 0;
+ break;
+ case PGASYNC_READY:
+ case PGASYNC_READY_MORE:
+ case PGASYNC_BUSY:
+ case PGASYNC_QUEUED:
+ /* can send sync to end this batch of cmds */
+ break;
+ }
+
+ entry = pqMakePipelineCmd(conn);
+ if (entry == NULL)
+ return 0; /* error msg already set */
+
+ entry->queryclass = PGQUERY_SYNC;
+ entry->query = NULL;
+
+ /* construct the Sync message */
+ if (pqPutMsgStart('S', false, conn) < 0 ||
+ pqPutMsgEnd(conn) < 0)
+ goto sendFailed;
+
+ pqAppendPipelineCmd(conn, entry);
+
+ /*
+ * Give the data a push. In nonblock mode, don't complain if we're unable
+ * to send it all; PQgetResult() will do any additional flushing needed.
+ */
+ if (PQflush(conn) < 0)
+ goto sendFailed;
+
+ /*
+ * Call pqBatchProcessQueue so the user can call start calling getResult.
+ */
+ pqBatchProcessQueue(conn);
+
+ return 1;
+
+sendFailed:
+ pqRecyclePipelineCmd(conn, entry);
+ /* error message should be set up already */
+ return 0;
+}
+
+/*
+ * pqMakePipelineCmd
+ * Get a command queue entry for caller to fill.
+ *
+ * If the recycle queue has a free element, that is returned; if not, a
+ * fresh one is allocated. Caller is responsible for adding it to the
+ * command queue (pqAppendPipelineCmd) once the struct is filled in, or
+ * releasing the memory (pqRecyclePipelineCmd) if an error occurs.
+ *
+ * If allocation fails, sets the error message and returns NULL.
+ */
+static PGcommandQueueEntry *
+pqMakePipelineCmd(PGconn *conn)
+{
+ PGcommandQueueEntry *entry;
+
+ if (conn->cmd_queue_recycle == NULL)
+ {
+ entry = (PGcommandQueueEntry *) malloc(sizeof(PGcommandQueueEntry));
+ if (entry == NULL)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ libpq_gettext("out of memory\n"));
+ return NULL;
+ }
+ }
+ else
+ {
+ entry = conn->cmd_queue_recycle;
+ conn->cmd_queue_recycle = entry->next;
+ }
+ entry->next = NULL;
+ entry->query = NULL;
+
+ return entry;
+}
+
+/*
+ * pqAppendPipelineCmd
+ * Append a precreated command queue entry to the queue after it's been
+ * sent successfully.
+ */
+static void
+pqAppendPipelineCmd(PGconn *conn, PGcommandQueueEntry *entry)
+{
+ if (conn->cmd_queue_head == NULL)
+ conn->cmd_queue_head = entry;
+ else
+ conn->cmd_queue_tail->next = entry;
+ conn->cmd_queue_tail = entry;
+}
+
+/*
+ * pqRecyclePipelineCmd
+ * Push a command queue entry onto the freelist. It must be an entry
+ * with null next pointer and not referenced by any other entry's next
+ * pointer.
+ */
+static void
+pqRecyclePipelineCmd(PGconn *conn, PGcommandQueueEntry *entry)
+{
+ if (entry == NULL)
+ return;
+
+ Assert(entry->next == NULL);
+
+ if (entry->query)
+ free(entry->query);
+
+ entry->next = conn->cmd_queue_recycle;
+ conn->cmd_queue_recycle = entry;
+}
+
/* ====== accessor funcs for PGresult ======== */
@@ -3285,6 +3841,23 @@ PQflush(PGconn *conn)
return pqFlush(conn);
}
+/*
+ * pqBatchFlush
+ *
+ * In batch mode, data will be flushed only when the out buffer reaches the
+ * threshold value. In non-batch mode, data will be flushed all the time.
+ *
+ * Returns 0 on success.
+ */
+static int
+pqBatchFlush(PGconn *conn)
+{
+ if ((conn->batch_status == PQBATCH_MODE_OFF) ||
+ (conn->outCount >= OUTBUFFER_THRESHOLD))
+ return (pqFlush(conn));
+ return 0;
+}
+
/*
* PQfreemem - safely frees memory allocated
diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c
index 9360c541be..2ff3fa4883 100644
--- a/src/interfaces/libpq/fe-protocol2.c
+++ b/src/interfaces/libpq/fe-protocol2.c
@@ -406,6 +406,12 @@ pqParseInput2(PGconn *conn)
{
char id;
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ fprintf(stderr, "internal error, attempt to read v2 protocol in batch mode");
+ abort();
+ }
+
/*
* Loop to parse successive complete messages available in the buffer.
*/
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 1696525475..da38e6aed1 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -217,10 +217,19 @@ pqParseInput3(PGconn *conn)
return;
conn->asyncStatus = PGASYNC_READY;
break;
- case 'Z': /* backend is ready for new query */
+ case 'Z': /* sync response, backend is ready for new
+ * query */
if (getReadyForQuery(conn))
return;
- conn->asyncStatus = PGASYNC_IDLE;
+ if (conn->batch_status != PQBATCH_MODE_OFF)
+ {
+ conn->batch_status = PQBATCH_MODE_ON;
+ conn->result = PQmakeEmptyPGresult(conn,
+ PGRES_BATCH_END);
+ conn->asyncStatus = PGASYNC_READY;
+ }
+ else
+ conn->asyncStatus = PGASYNC_IDLE;
break;
case 'I': /* empty query */
if (conn->result == NULL)
@@ -875,6 +884,9 @@ pqGetErrorNotice3(PGconn *conn, bool isError)
PQExpBufferData workBuf;
char id;
+ if (isError && conn->batch_status != PQBATCH_MODE_OFF)
+ conn->batch_status = PQBATCH_MODE_ABORTED;
+
/*
* If this is an error message, pre-emptively clear any incomplete query
* result we may have. We'd just throw it away below anyway, and
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 3b6a9fbce3..fcdd887958 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -97,7 +97,10 @@ typedef enum
PGRES_NONFATAL_ERROR, /* notice or warning message */
PGRES_FATAL_ERROR, /* query failed */
PGRES_COPY_BOTH, /* Copy In/Out data transfer in progress */
- PGRES_SINGLE_TUPLE /* single tuple from larger resultset */
+ PGRES_SINGLE_TUPLE, /* single tuple from larger resultset */
+ PGRES_BATCH_END, /* end of a batch of commands */
+ PGRES_BATCH_ABORTED, /* Command didn't run because of an abort
+ * earlier in a batch */
} ExecStatusType;
typedef enum
@@ -137,6 +140,17 @@ typedef enum
PQPING_NO_ATTEMPT /* connection not attempted (bad params) */
} PGPing;
+/*
+ * PQBatchStatus - Current status of batch mode
+ */
+
+typedef enum
+{
+ PQBATCH_MODE_OFF,
+ PQBATCH_MODE_ON,
+ PQBATCH_MODE_ABORTED
+} PQBatchStatus;
+
/* PGconn encapsulates a connection to the backend.
* The contents of this struct are not supposed to be known to applications.
*/
@@ -328,6 +342,7 @@ extern int PQserverVersion(const PGconn *conn);
extern char *PQerrorMessage(const PGconn *conn);
extern int PQsocket(const PGconn *conn);
extern int PQbackendPID(const PGconn *conn);
+extern int PQbatchStatus(const PGconn *conn);
extern int PQconnectionNeedsPassword(const PGconn *conn);
extern int PQconnectionUsedPassword(const PGconn *conn);
extern int PQclientEncoding(const PGconn *conn);
@@ -435,6 +450,11 @@ extern PGresult *PQgetResult(PGconn *conn);
extern int PQisBusy(PGconn *conn);
extern int PQconsumeInput(PGconn *conn);
+/* Routines for batch mode management */
+extern int PQenterBatchMode(PGconn *conn);
+extern int PQexitBatchMode(PGconn *conn);
+extern int PQbatchSendQueue(PGconn *conn);
+
/* LISTEN/NOTIFY support */
extern PGnotify *PQnotifies(PGconn *conn);
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae295..d2b26f2299 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -217,10 +217,15 @@ typedef enum
{
PGASYNC_IDLE, /* nothing's happening, dude */
PGASYNC_BUSY, /* query in progress */
- PGASYNC_READY, /* result ready for PQgetResult */
+ PGASYNC_READY, /* query done, waiting for client to fetch
+ * result */
+ PGASYNC_READY_MORE, /* query done, waiting for client to fetch
+ * result, More results expected from this
+ * query */
PGASYNC_COPY_IN, /* Copy In data transfer in progress */
PGASYNC_COPY_OUT, /* Copy Out data transfer in progress */
- PGASYNC_COPY_BOTH /* Copy In/Out data transfer in progress */
+ PGASYNC_COPY_BOTH, /* Copy In/Out data transfer in progress */
+ PGASYNC_QUEUED /* Current query done, more in queue */
} PGAsyncStatusType;
/* PGQueryClass tracks which query protocol we are now executing */
@@ -229,7 +234,8 @@ typedef enum
PGQUERY_SIMPLE, /* simple Query protocol (PQexec) */
PGQUERY_EXTENDED, /* full Extended protocol (PQexecParams) */
PGQUERY_PREPARE, /* Parse only (PQprepare) */
- PGQUERY_DESCRIBE /* Describe Statement or Portal */
+ PGQUERY_DESCRIBE, /* Describe Statement or Portal */
+ PGQUERY_SYNC /* A protocol sync to end a batch */
} PGQueryClass;
/* PGSetenvStatusType defines the state of the pqSetenv state machine */
@@ -301,6 +307,22 @@ typedef enum pg_conn_host_type
CHT_UNIX_SOCKET
} pg_conn_host_type;
+/* An entry in the pending command queue. Used by batch mode to keep track
+ * of the expected results of future commands we've dispatched.
+ *
+ * Note that entries in this list are reused by being zeroed and appended to
+ * the tail when popped off the head. The entry with null next pointer is not
+ * the end of the list of expected commands, that's the tail pointer in
+ * pg_conn.
+ */
+typedef struct pgCommandQueueEntry
+{
+ PGQueryClass queryclass; /* Query type; PGQUERY_SYNC for sync msg */
+ char *query; /* SQL command, or NULL if unknown */
+ struct pgCommandQueueEntry *next;
+} PGcommandQueueEntry;
+
+
/*
* pg_conn_host stores all information about each of possibly several hosts
* mentioned in the connection string. Most fields are derived by splitting
@@ -394,6 +416,7 @@ struct pg_conn
bool options_valid; /* true if OK to attempt connection */
bool nonblocking; /* whether this connection is using nonblock
* sending semantics */
+ PQBatchStatus batch_status; /* Batch(pipelining) mode status of connection */
bool singleRowMode; /* return current query result row-by-row? */
char copy_is_binary; /* 1 = copy binary, 0 = copy text */
int copy_already_done; /* # bytes already returned in COPY OUT */
@@ -406,6 +429,16 @@ struct pg_conn
pg_conn_host *connhost; /* details about each named host */
char *connip; /* IP address for current network connection */
+ /*
+ * The command queue
+ *
+ * head is the next pending cmd, tail is where we append new commands.
+ * Freed entries for recycling go on the recycle linked list.
+ */
+ PGcommandQueueEntry *cmd_queue_head;
+ PGcommandQueueEntry *cmd_queue_tail;
+ PGcommandQueueEntry *cmd_queue_recycle;
+
/* Connection data */
pgsocket sock; /* FD for socket, PGINVALID_SOCKET if
* unconnected */
@@ -798,6 +831,11 @@ extern ssize_t pg_GSS_read(PGconn *conn, void *ptr, size_t len);
*/
#define pqIsnonblocking(conn) ((conn)->nonblocking)
+/*
+ * Connection's outbuffer threshold.
+ */
+#define OUTBUFFER_THRESHOLD 65536
+
#ifdef ENABLE_NLS
extern char *libpq_gettext(const char *msgid) pg_attribute_format_arg(1);
extern char *libpq_ngettext(const char *msgid, const char *msgid_plural, unsigned long n) pg_attribute_format_arg(1) pg_attribute_format_arg(2);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index a6d2ffbf9e..287214c544 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -17,6 +17,7 @@ SUBDIRS = \
test_extensions \
test_ginpostinglist \
test_integerset \
+ test_libpq \
test_misc \
test_parser \
test_pg_dump \
diff --git a/src/test/modules/test_libpq/.gitignore b/src/test/modules/test_libpq/.gitignore
new file mode 100644
index 0000000000..11e8463984
--- /dev/null
+++ b/src/test/modules/test_libpq/.gitignore
@@ -0,0 +1,5 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
+/testlibpqbatch
diff --git a/src/test/modules/test_libpq/Makefile b/src/test/modules/test_libpq/Makefile
new file mode 100644
index 0000000000..6d3a0ea4d5
--- /dev/null
+++ b/src/test/modules/test_libpq/Makefile
@@ -0,0 +1,20 @@
+# src/test/modules/test_libpq/Makefile
+
+PROGRAM = testlibpqbatch
+OBJS = testlibpqbatch.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS += $(libpq)
+
+TAP_TESTS = 1
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_libpq
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_libpq/README b/src/test/modules/test_libpq/README
new file mode 100644
index 0000000000..d8174dd579
--- /dev/null
+++ b/src/test/modules/test_libpq/README
@@ -0,0 +1 @@
+Test programs and libraries for libpq
diff --git a/src/test/modules/test_libpq/t/001_libpq_async.pl b/src/test/modules/test_libpq/t/001_libpq_async.pl
new file mode 100644
index 0000000000..0b27896a2a
--- /dev/null
+++ b/src/test/modules/test_libpq/t/001_libpq_async.pl
@@ -0,0 +1,26 @@
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 6;
+use Cwd;
+
+my $node = get_new_node('main');
+$node->init;
+$node->start;
+
+my $numrows = 10000;
+my @tests =
+ qw(disallowed_in_batch simple_batch multi_batch batch_abort
+ batch_insert singlerow);
+$ENV{PATH} = "$ENV{PATH}:" . getcwd();
+for my $testname (@tests)
+{
+ $node->command_ok(
+ [ 'testlibpqbatch', $testname, $node->connstr('postgres'), $numrows ],
+ "testlibpqbatch $testname");
+}
+
+$node->stop('fast');
diff --git a/src/test/modules/test_libpq/testlibpqbatch.c b/src/test/modules/test_libpq/testlibpqbatch.c
new file mode 100644
index 0000000000..7157e6cb90
--- /dev/null
+++ b/src/test/modules/test_libpq/testlibpqbatch.c
@@ -0,0 +1,963 @@
+/*
+ * src/test/modules/test_libpq/testlibpqbatch.c
+ * Verify libpq batch execution functionality
+ */
+#include "postgres_fe.h"
+
+#include
+
+#include "catalog/pg_type_d.h"
+#include "common/fe_memutils.h"
+#include "libpq-fe.h"
+#include "portability/instr_time.h"
+
+
+static void exit_nicely(PGconn *conn);
+
+const char *const progname = "testlibpqbatch";
+
+
+#define DEBUG
+#ifdef DEBUG
+#define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
+#else
+#define pg_debug(...)
+#endif
+
+static const char *const drop_table_sql =
+"DROP TABLE IF EXISTS batch_demo";
+static const char *const create_table_sql =
+"CREATE UNLOGGED TABLE batch_demo(id serial primary key, itemno integer);";
+static const char *const insert_sql =
+"INSERT INTO batch_demo(itemno) VALUES ($1);";
+
+/* max char length of an int32, plus sign and null terminator */
+#define MAXINTLEN 12
+
+static void
+exit_nicely(PGconn *conn)
+{
+ PQfinish(conn);
+ exit(1);
+}
+
+/*
+ * Print an error to stderr and terminate the program.
+ */
+#define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
+static void
+pg_fatal_impl(int line, const char *fmt,...)
+{
+ va_list args;
+
+ fprintf(stderr, "\n"); /* XXX hack */
+ fprintf(stderr, "%s:%d: ", progname, line);
+
+ va_start(args, fmt);
+ vfprintf(stderr, fmt, args);
+ va_end(args);
+ printf("Failure, exiting\n");
+ exit(1);
+}
+
+static void
+test_disallowed_in_batch(PGconn *conn)
+{
+ PGresult *res = NULL;
+
+ fprintf(stderr, "test error cases... ");
+
+ if (PQisnonblocking(conn))
+ pg_fatal("Expected blocking connection mode\n");
+
+ if (PQenterBatchMode(conn) != 1)
+ pg_fatal("Unable to enter batch mode\n");
+
+ if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+ pg_fatal("Batch mode not activated properly\n");
+
+ /* PQexec should fail in batch mode */
+ res = PQexec(conn, "SELECT 1");
+ if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+ pg_fatal("PQexec should fail in batch mode but succeeded\n");
+
+ /* So should PQsendQuery */
+ if (PQsendQuery(conn, "SELECT 1") != 0)
+ pg_fatal("PQsendQuery should fail in batch mode but succeeded\n");
+
+ /* Entering batch mode when already in batch mode is OK */
+ if (PQenterBatchMode(conn) != 1)
+ pg_fatal("re-entering batch mode should be a no-op but failed\n");
+
+ if (PQisBusy(conn) != 0)
+ pg_fatal("PQisBusy should return 0 when idle in batch, returned 1\n");
+
+ /* ok, back to normal command mode */
+ if (PQexitBatchMode(conn) != 1)
+ pg_fatal("couldn't exit idle empty batch mode\n");
+
+ if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+ pg_fatal("Batch mode not terminated properly\n");
+
+ /* exiting batch mode when not in batch mode should be a no-op */
+ if (PQexitBatchMode(conn) != 1)
+ pg_fatal("batch mode exit when not in batch mode should succeed but failed\n");
+
+ /* can now PQexec again */
+ res = PQexec(conn, "SELECT 1");
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("PQexec should succeed after exiting batch mode but failed with: %s\n",
+ PQerrorMessage(conn));
+
+ fprintf(stderr, "ok\n");
+}
+
+static void
+test_simple_batch(PGconn *conn)
+{
+ PGresult *res = NULL;
+ const char *dummy_params[1] = {"1"};
+ Oid dummy_param_oids[1] = {INT4OID};
+
+ fprintf(stderr, "simple batch... ");
+
+ /*
+ * Enter batch mode and dispatch a set of operations, which we'll then
+ * process the results of as they come in.
+ *
+ * For a simple case we should be able to do this without interim
+ * processing of results since our out buffer will give us enough slush to
+ * work with and we won't block on sending. So blocking mode is fine.
+ */
+ if (PQisnonblocking(conn))
+ pg_fatal("Expected blocking connection mode\n");
+
+ if (PQenterBatchMode(conn) != 1)
+ pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT $1",
+ 1, dummy_param_oids, dummy_params,
+ NULL, NULL, 0) != 1)
+ pg_fatal("dispatching SELECT failed: %s\n", PQerrorMessage(conn));
+
+ if (PQexitBatchMode(conn) != 0)
+ pg_fatal("exiting batch mode with work in progress should fail, but succeeded\n");
+
+ if (PQbatchSendQueue(conn) != 1)
+ pg_fatal("Ending a batch failed: %s\n", PQerrorMessage(conn));
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a batch item: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from first batch item\n",
+ PQresStatus(PQresultStatus(res)));
+
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after first query result.\n");
+
+ /*
+ * Even though we've processed the result there's still a sync to come and
+ * we can't exit batch mode yet
+ */
+ if (PQexitBatchMode(conn) != 0)
+ pg_fatal("exiting batch mode after query but before sync succeeded incorrectly\n");
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when sync result PGRES_BATCH_END expected: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_BATCH_END)
+ pg_fatal("Unexpected result code %s instead of sync result, error: %s\n",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after end batch call\n");
+
+ /* We're still in a batch... */
+ if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+ pg_fatal("Fell out of batch mode somehow\n");
+
+ /* ... until we end it, which we can safely do now */
+ if (PQexitBatchMode(conn) != 1)
+ pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+ pg_fatal("Exiting batch mode didn't seem to work\n");
+
+ fprintf(stderr, "ok\n");
+}
+
+static void
+test_multi_batch(PGconn *conn)
+{
+ PGresult *res = NULL;
+ const char *dummy_params[1] = {"1"};
+ Oid dummy_param_oids[1] = {INT4OID};
+
+ fprintf(stderr, "multi batch... ");
+
+ /*
+ * Queue up a couple of small batches and process each without returning
+ * to command mode first.
+ */
+ if (PQenterBatchMode(conn) != 1)
+ pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching first SELECT failed: %s\n", PQerrorMessage(conn));
+
+ if (PQbatchSendQueue(conn) != 1)
+ pg_fatal("Ending first batch failed: %s\n", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching second SELECT failed: %s\n", PQerrorMessage(conn));
+
+ if (PQbatchSendQueue(conn) != 1)
+ pg_fatal("Ending second batch failed: %s\n", PQerrorMessage(conn));
+
+ /* OK, start processing the batch results */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a batch item: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from first batch item\n",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+ res = NULL;
+
+ if (PQgetResult(conn) != NULL)
+ pg_fatal("PQgetResult returned something extra after first result\n");
+
+ if (PQexitBatchMode(conn) != 0)
+ pg_fatal("exiting batch mode after query but before sync succeeded incorrectly\n");
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when sync result expected: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_BATCH_END)
+ pg_fatal("Unexpected result code %s instead of sync result, error: %s\n",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+
+ PQclear(res);
+
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("Expected null result, got %s\n",
+ PQresStatus(PQresultStatus(res)));
+
+ /* second batch */
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a batch item: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Unexpected result code %s from second batch item\n",
+ PQresStatus(PQresultStatus(res)));
+
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("Expected null result, got %s\n",
+ PQresStatus(PQresultStatus(res)));
+
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("PQgetResult returned null when there's a batch item: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQresultStatus(res) != PGRES_BATCH_END)
+ pg_fatal("Unexpected result code %s from second end batch\n",
+ PQresStatus(PQresultStatus(res)));
+
+ /* We're still in a batch... */
+ if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+ pg_fatal("Fell out of batch mode somehow\n");
+
+ /* until we end it, which we can safely do now */
+ if (PQexitBatchMode(conn) != 1)
+ pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+ pg_fatal("exiting batch mode didn't seem to work\n");
+
+ fprintf(stderr, "ok\n");
+}
+
+/*
+ * When an operation in a batch fails the rest of the batch is flushed. We
+ * still have to get results for each batch item, but the item will just be
+ * a PGRES_BATCH_ABORTED code.
+ *
+ * This intentionally doesn't use a transaction to wrap the batch. You should
+ * usually use an xact, but in this case we want to observe the effects of each
+ * statement.
+ */
+static void
+test_batch_abort(PGconn *conn)
+{
+ PGresult *res = NULL;
+ const char *dummy_params[1] = {"1"};
+ Oid dummy_param_oids[1] = {INT4OID};
+ int i;
+
+ fprintf(stderr, "aborted batch... ");
+
+ res = PQexec(conn, drop_table_sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("dispatching DROP TABLE failed: %s\n", PQerrorMessage(conn));
+
+ res = PQexec(conn, create_table_sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("dispatching CREATE TABLE failed: %s\n", PQerrorMessage(conn));
+
+ /*
+ * Queue up a couple of small batches and process each without returning
+ * to command mode first. Make sure the second operation in the first
+ * batch ERRORs.
+ */
+ if (PQenterBatchMode(conn) != 1)
+ pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+ dummy_params[0] = "1";
+ if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching first insert failed: %s\n", PQerrorMessage(conn));
+
+ if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
+ 1, dummy_param_oids, dummy_params,
+ NULL, NULL, 0) != 1)
+ pg_fatal("dispatching error select failed: %s\n", PQerrorMessage(conn));
+
+ dummy_params[0] = "2";
+ if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching second insert failed: %s\n", PQerrorMessage(conn));
+
+ if (PQbatchSendQueue(conn) != 1)
+ pg_fatal("Sending first batch failed: %s\n", PQerrorMessage(conn));
+
+ dummy_params[0] = "3";
+ if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
+ dummy_params, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching second-batch insert failed: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQbatchSendQueue(conn) != 1)
+ pg_fatal("Ending second batch failed: %s\n", PQerrorMessage(conn));
+
+ /*
+ * OK, start processing the batch results.
+ *
+ * We should get a command-ok for the first query, then a fatal error and
+ * a batch aborted message for the second insert, a batch-end, then a
+ * command-ok and a batch-ok for the second batch operation.
+ */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("Unexpected result status %s: %s\n",
+ PQresStatus(PQresultStatus(res)),
+ PQresultErrorMessage(res));
+ PQclear(res);
+
+ /* NULL result to signal end-of-results for this command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s\n",
+ PQresStatus(PQresultStatus(res)));
+
+ /* Second query caused error, so we expect an error next */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_FATAL_ERROR)
+ pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s\n",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* NULL result to signal end-of-results for this command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s\n",
+ PQresStatus(PQresultStatus(res)));
+
+ /*
+ * batch should now be aborted.
+ *
+ * Note that we could still queue more queries at this point if we wanted;
+ * they'd get added to a new third batch since we've already sent a
+ * second. The aborted flag relates only to the batch being received.
+ */
+ if (PQbatchStatus(conn) != PQBATCH_MODE_ABORTED)
+ pg_fatal("batch should be flagged as aborted but isn't\n");
+
+ /* third query in batch, the second insert */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_BATCH_ABORTED)
+ pg_fatal("Unexpected result code -- expected PGRES_BATCH_ABORTED, got %s\n",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* NULL result to signal end-of-results for this command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+ if (PQbatchStatus(conn) != PQBATCH_MODE_ABORTED)
+ pg_fatal("batch should be flagged as aborted but isn't\n");
+
+ /* Ensure we're still in batch */
+ if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+ pg_fatal("Fell out of batch mode somehow\n");
+
+ /*
+ * The end of a failed batch is a PGRES_BATCH_END.
+ *
+ * (This is so clients know to start processing results normally again and
+ * can tell the difference between skipped commands and the sync.)
+ */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_BATCH_END)
+ pg_fatal("Unexpected result code from first batch sync\n"
+ "Expected PGRES_BATCH_END, got %s\n",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* XXX why do we have a NULL result after PGRES_BATCH_END? */
+ res = PQgetResult(conn);
+ if (res != NULL)
+ pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+ if (PQbatchStatus(conn) == PQBATCH_MODE_ABORTED)
+ pg_fatal("sync should've cleared the aborted flag but didn't\n");
+
+ /* We're still in a batch... */
+ if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+ pg_fatal("Fell out of batch mode somehow\n");
+
+ /* the insert from the second batch */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("Unexpected result code %s from first item in second batch\n",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ /* Read the NULL result at the end of the command */
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s\n", PQresStatus(PQresultStatus(res)));
+
+ /* the second batch sync */
+ if ((res = PQgetResult(conn)) == NULL)
+ pg_fatal("Unexpected NULL result: %s\n", PQerrorMessage(conn));
+ if (PQresultStatus(res) != PGRES_BATCH_END)
+ pg_fatal("Unexpected result code %s from second batch sync\n",
+ PQresStatus(PQresultStatus(res)));
+ PQclear(res);
+
+ if ((res = PQgetResult(conn)) != NULL)
+ pg_fatal("Expected null result, got %s: %s\n",
+ PQresStatus(PQresultStatus(res)),
+ PQerrorMessage(conn));
+
+ /* We're still in a batch... */
+ if (PQbatchStatus(conn) == PQBATCH_MODE_OFF)
+ pg_fatal("Fell out of batch mode somehow\n");
+
+ /* until we end it, which we can safely do now */
+ if (PQexitBatchMode(conn) != 1)
+ pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQbatchStatus(conn) != PQBATCH_MODE_OFF)
+ pg_fatal("exiting batch mode didn't seem to work\n");
+
+ fprintf(stderr, "ok\n");
+
+ /*-
+ * Since we fired the batches off without a surrounding xact, the results
+ * should be:
+ *
+ * - Implicit xact started by server around 1st batch
+ * - First insert applied
+ * - Second statement aborted xact
+ * - Third insert skipped
+ * - Sync rolled back first implicit xact
+ * - Implicit xact created by server around 2nd batch
+ * - insert applied from 2nd batch
+ * - Sync commits 2nd xact
+ *
+ * So we should only have the value 3 that we inserted.
+ */
+ res = PQexec(conn, "SELECT itemno FROM batch_demo");
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pg_fatal("Expected tuples, got %s: %s\n",
+ PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
+ if (PQntuples(res) != 1)
+ pg_fatal("expected 1 result, got %d\n", PQntuples(res));
+ for (i = 0; i < PQntuples(res); i++)
+ {
+ const char *val = PQgetvalue(res, i, 0);
+
+ if (strcmp(val, "3") != 0)
+ pg_fatal("expected only insert with value 3, got %s", val);
+ }
+
+ PQclear(res);
+}
+
+/* State machine enum for test_batch_insert */
+typedef enum BatchInsertStep
+{
+ BI_BEGIN_TX,
+ BI_DROP_TABLE,
+ BI_CREATE_TABLE,
+ BI_PREPARE,
+ BI_INSERT_ROWS,
+ BI_COMMIT_TX,
+ BI_SYNC,
+ BI_DONE
+} BatchInsertStep;
+
+static void
+test_batch_insert(PGconn *conn, int n_rows)
+{
+ const char *insert_params[1];
+ Oid insert_param_oids[1] = {INT4OID};
+ char insert_param_0[MAXINTLEN];
+ BatchInsertStep send_step = BI_BEGIN_TX,
+ recv_step = BI_BEGIN_TX;
+ int rows_to_send,
+ rows_to_receive;
+
+ insert_params[0] = &insert_param_0[0];
+
+ rows_to_send = rows_to_receive = n_rows;
+
+ /*
+ * Do a batched insert into a table created at the start of the batch
+ */
+ if (PQenterBatchMode(conn) != 1)
+ pg_fatal("failed to enter batch mode: %s\n", PQerrorMessage(conn));
+
+ while (send_step != BI_PREPARE)
+ {
+ const char *sql;
+
+ switch (send_step)
+ {
+ case BI_BEGIN_TX:
+ sql = "BEGIN TRANSACTION";
+ send_step = BI_DROP_TABLE;
+ break;
+
+ case BI_DROP_TABLE:
+ sql = drop_table_sql;
+ send_step = BI_CREATE_TABLE;
+ break;
+
+ case BI_CREATE_TABLE:
+ sql = create_table_sql;
+ send_step = BI_PREPARE;
+ break;
+
+ default:
+ pg_fatal("invalid state");
+ }
+
+ pg_debug("sending: %s\n", sql);
+ if (PQsendQueryParams(conn, sql,
+ 0, NULL, NULL, NULL, NULL, 0) != 1)
+ pg_fatal("dispatching %s failed: %s\n", sql, PQerrorMessage(conn));
+ }
+
+ Assert(send_step == BI_PREPARE);
+ pg_debug("sending: %s\n", insert_sql);
+ if (PQsendPrepare(conn, "my_insert", insert_sql, 1, insert_param_oids) != 1)
+ pg_fatal("dispatching PREPARE failed: %s\n", PQerrorMessage(conn));
+ send_step = BI_INSERT_ROWS;
+
+ /*
+ * Now we start inserting. We'll be sending enough data that we could fill
+ * our out buffer, so to avoid deadlocking we need to enter nonblocking
+ * mode and consume input while we send more output. As results of each
+ * query are processed we should pop them to allow processing of the next
+ * query. There's no need to finish the batch before processing results.
+ */
+ if (PQsetnonblocking(conn, 1) != 0)
+ pg_fatal("failed to set nonblocking mode: %s\n", PQerrorMessage(conn));
+
+ while (recv_step != BI_DONE)
+ {
+ int sock;
+ fd_set input_mask;
+ fd_set output_mask;
+
+ sock = PQsocket(conn);
+
+ if (sock < 0)
+ break; /* shouldn't happen */
+
+ FD_ZERO(&input_mask);
+ FD_SET(sock, &input_mask);
+ FD_ZERO(&output_mask);
+ FD_SET(sock, &output_mask);
+
+ if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
+ {
+ fprintf(stderr, "select() failed: %s\n", strerror(errno));
+ exit_nicely(conn);
+ }
+
+ /*
+ * Process any results, so we keep the server's out buffer free
+ * flowing and it can continue to process input
+ */
+ if (FD_ISSET(sock, &input_mask))
+ {
+ PQconsumeInput(conn);
+
+ /* Read until we'd block if we tried to read */
+ while (!PQisBusy(conn) && recv_step < BI_DONE)
+ {
+ PGresult *res;
+ const char *cmdtag;
+ const char *description = "";
+ int status;
+
+ /*
+ * Read next result. If no more results from this query,
+ * advance to the next query
+ */
+ res = PQgetResult(conn);
+ if (res == NULL)
+ continue;
+
+ status = PGRES_COMMAND_OK;
+ switch (recv_step)
+ {
+ case BI_BEGIN_TX:
+ cmdtag = "BEGIN";
+ recv_step++;
+ break;
+ case BI_DROP_TABLE:
+ cmdtag = "DROP TABLE";
+ recv_step++;
+ break;
+ case BI_CREATE_TABLE:
+ cmdtag = "CREATE TABLE";
+ recv_step++;
+ break;
+ case BI_PREPARE:
+ cmdtag = "";
+ description = "PREPARE";
+ recv_step++;
+ break;
+ case BI_INSERT_ROWS:
+ cmdtag = "INSERT";
+ rows_to_receive--;
+ if (rows_to_receive == 0)
+ recv_step++;
+ break;
+ case BI_COMMIT_TX:
+ cmdtag = "COMMIT";
+ recv_step++;
+ break;
+ case BI_SYNC:
+ cmdtag = "";
+ description = "SYNC";
+ status = PGRES_BATCH_END;
+ recv_step++;
+ break;
+ case BI_DONE:
+ /* unreachable */
+ description = "";
+ abort();
+ }
+
+ if (PQresultStatus(res) != status)
+ pg_fatal("%s reported status %s, expected %s\n"
+ "Error message: \"%s\"\n",
+ description, PQresStatus(PQresultStatus(res)),
+ PQresStatus(status), PQerrorMessage(conn));
+
+ if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
+ pg_fatal("%s expected command tag '%s', got '%s'\n",
+ description, cmdtag, PQcmdStatus(res));
+
+ pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
+
+ PQclear(res);
+ }
+ }
+
+ /* Write more rows and/or the end batch message, if needed */
+ if (FD_ISSET(sock, &output_mask))
+ {
+ PQflush(conn);
+
+ if (send_step == BI_INSERT_ROWS)
+ {
+ snprintf(&insert_param_0[0], MAXINTLEN, "%d", rows_to_send);
+
+ if (PQsendQueryPrepared(conn, "my_insert",
+ 1, insert_params, NULL, NULL, 0) == 1)
+ {
+ pg_debug("sent row %d\n", rows_to_send);
+
+ rows_to_send--;
+ if (rows_to_send == 0)
+ send_step = BI_COMMIT_TX;
+ }
+ else
+ {
+ /*
+ * in nonblocking mode, so it's OK for an insert to fail
+ * to send
+ */
+ fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
+ rows_to_send, PQerrorMessage(conn));
+ }
+ }
+ else if (send_step == BI_COMMIT_TX)
+ {
+ if (PQsendQueryParams(conn, "COMMIT",
+ 0, NULL, NULL, NULL, NULL, 0) == 1)
+ {
+ pg_debug("sent COMMIT\n");
+ send_step = BI_SYNC;
+ }
+ else
+ {
+ fprintf(stderr, "WARNING: failed to send commit: %s\n",
+ PQerrorMessage(conn));
+ }
+ }
+ else if (send_step == BI_SYNC)
+ {
+ if (PQbatchSendQueue(conn) == 1)
+ {
+ fprintf(stdout, "Dispatched end batch message\n");
+ send_step = BI_DONE;
+ }
+ else
+ {
+ fprintf(stderr, "WARNING: Ending a batch failed: %s\n",
+ PQerrorMessage(conn));
+ }
+ }
+ }
+ }
+
+ /* We've got the sync message and the batch should be done */
+ if (PQexitBatchMode(conn) != 1)
+ pg_fatal("attempt to exit batch mode failed when it should've succeeded: %s\n",
+ PQerrorMessage(conn));
+
+ if (PQsetnonblocking(conn, 0) != 0)
+ pg_fatal("failed to clear nonblocking mode: %s\n", PQerrorMessage(conn));
+}
+
+static void
+test_singlerowmode(PGconn *conn)
+{
+ PGresult *res;
+ int i;
+ bool batch_ended = false;
+
+ /* 1 batch, 3 queries in it */
+ if (PQenterBatchMode(conn) != 1)
+ pg_fatal("failed to enter batch mode: %s\n",
+ PQerrorMessage(conn));
+
+ for (i = 0; i < 3; i++)
+ {
+ char *param[1];
+
+ param[0] = psprintf("%d", 44 + i);
+
+ if (PQsendQueryParams(conn,
+ "SELECT generate_series(42, $1)",
+ 1,
+ NULL,
+ (const char **) param,
+ NULL,
+ NULL,
+ 0) != 1)
+ pg_fatal("failed to send query: %s\n",
+ PQerrorMessage(conn));
+ pfree(param[0]);
+ }
+ PQbatchSendQueue(conn);
+
+ for (i = 0; !batch_ended; i++)
+ {
+ bool first = true;
+ bool saw_ending_tuplesok;
+ bool isSingleTuple = false;
+
+ /* Set single row mode for only first 2 SELECT queries */
+ if (i < 2)
+ {
+ if (PQsetSingleRowMode(conn) != 1)
+ pg_fatal("PQsetSingleRowMode() failed for i=%d\n", i);
+ }
+
+ /* Consume rows for this query */
+ saw_ending_tuplesok = false;
+ while ((res = PQgetResult(conn)) != NULL)
+ {
+ ExecStatusType est = PQresultStatus(res);
+
+ if (est == PGRES_BATCH_END)
+ {
+ fprintf(stderr, "end of batch reached\n");
+ batch_ended = true;
+ PQclear(res);
+ if (i != 3)
+ pg_fatal("Expected three results, got %d\n", i);
+ break;
+ }
+
+ /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
+ if (first)
+ {
+ if (i <= 1 && est != PGRES_SINGLE_TUPLE)
+ pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s\n",
+ i, PQresStatus(est));
+ if (i >= 2 && est != PGRES_TUPLES_OK)
+ pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s\n",
+ i, PQresStatus(est));
+ first = false;
+ }
+
+ fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
+ switch (est)
+ {
+ case PGRES_TUPLES_OK:
+ fprintf(stderr, ", tuples: %d\n", PQntuples(res));
+ saw_ending_tuplesok = true;
+ if (isSingleTuple)
+ {
+ if (PQntuples(res) == 0)
+ fprintf(stderr, "all tuples received in query %d\n", i);
+ else
+ pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, "
+ "but received PGRES_TUPLES_OK directly instead\n");
+ }
+ break;
+
+ case PGRES_SINGLE_TUPLE:
+ fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
+ break;
+
+ default:
+ pg_fatal("unexpected\n");
+ }
+ PQclear(res);
+ }
+ if (!batch_ended && !saw_ending_tuplesok)
+ pg_fatal("didn't get expected terminating TUPLES_OK\n");
+ }
+
+ if (PQexitBatchMode(conn) != 1)
+ pg_fatal("failed to end batch mode: %s\n", PQerrorMessage(conn));
+}
+
+static void
+usage(const char *progname)
+{
+ fprintf(stderr, "%s tests libpq's batch-mode.\n\n", progname);
+ fprintf(stderr, "Usage:\n");
+ fprintf(stderr, " %s testname [conninfo [number_of_rows]]\n", progname);
+ fprintf(stderr, "Tests:\n");
+ fprintf(stderr, " disallowed_in_batch|simple_batch|multi_batch|batch_abort|\n");
+ fprintf(stderr, " singlerow|batch_insert\n");
+}
+
+int
+main(int argc, char **argv)
+{
+ const char *conninfo = "";
+ PGconn *conn;
+ int numrows = 10000;
+
+ /*
+ * The testname parameter is mandatory; it can be followed by a conninfo
+ * string and number of rows.
+ */
+ if (argc < 2 || argc > 4)
+ {
+ usage(argv[0]);
+ exit(1);
+ }
+
+ if (argc >= 3)
+ conninfo = pg_strdup(argv[2]);
+
+ if (argc >= 4)
+ {
+ errno = 0;
+ numrows = strtol(argv[3], NULL, 10);
+ if (errno != 0 || numrows <= 0)
+ {
+ fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n", argv[3]);
+ exit(1);
+ }
+ }
+
+ /* Make a connection to the database */
+ conn = PQconnectdb(conninfo);
+ if (PQstatus(conn) != CONNECTION_OK)
+ {
+ fprintf(stderr, "Connection to database failed: %s\n",
+ PQerrorMessage(conn));
+ exit_nicely(conn);
+ }
+
+ if (strcmp(argv[1], "disallowed_in_batch") == 0)
+ test_disallowed_in_batch(conn);
+ else if (strcmp(argv[1], "simple_batch") == 0)
+ test_simple_batch(conn);
+ else if (strcmp(argv[1], "multi_batch") == 0)
+ test_multi_batch(conn);
+ else if (strcmp(argv[1], "batch_abort") == 0)
+ test_batch_abort(conn);
+ else if (strcmp(argv[1], "batch_insert") == 0)
+ test_batch_insert(conn, numrows);
+ else if (strcmp(argv[1], "singlerow") == 0)
+ test_singlerowmode(conn);
+ else
+ {
+ fprintf(stderr, "\"%s\" is not a recognized test name\n", argv[1]);
+ usage(argv[0]);
+ exit(1);
+ }
+
+ /* close the connection to the database and cleanup */
+ PQfinish(conn);
+ return 0;
+}
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 90594bd41b..634e48ec85 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -50,7 +50,8 @@ my @contrib_excludes = (
'pgcrypto', 'sepgsql',
'brin', 'test_extensions',
'test_misc', 'test_pg_dump',
- 'snapshot_too_old', 'unsafe_tests');
+ 'snapshot_too_old', 'unsafe_tests',
+ 'test_libpq');
# Set of variables for frontend modules
my $frontend_defines = { 'initdb' => 'FRONTEND' };
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 03c4e0fe5b..9b75db962b 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -215,6 +215,7 @@ BackgroundWorkerHandle
BackgroundWorkerSlot
Barrier
BaseBackupCmd
+BatchInsertStep
BeginDirectModify_function
BeginForeignInsert_function
BeginForeignModify_function
@@ -1544,6 +1545,7 @@ PG_Locale_Strategy
PG_Lock_Status
PG_init_t
PGcancel
+PGcommandQueueEntry
PGconn
PGdataValue
PGlobjfuncs
@@ -1656,6 +1658,7 @@ PMSignalReason
PMState
POLYGON
PQArgBlock
+PQBatchStatus
PQEnvironmentOption
PQExpBuffer
PQExpBufferData