From: | Boris Kolpackov <boris(at)codesynthesis(dot)com> |
---|---|
To: | Alvaro Herrera <alvaro(dot)herrera(at)2ndquadrant(dot)com> |
Cc: | pgsql-hackers(at)lists(dot)postgresql(dot)org |
Subject: | Re: Pipeline mode and PQpipelineSync() |
Date: | 2021-07-07 15:09:41 |
Message-ID: | boris.20210707170157@codesynthesis.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Alvaro Herrera <alvaro(dot)herrera(at)2ndquadrant(dot)com> writes:
> On 2021-Jul-07, Boris Kolpackov wrote:
>
> > I don't get any difference in behavior with this patch. That is, I
> > still get the unexpected NULL result. Does it make a difference for
> > your reproducer?
>
> Yes, the behavior changes for my repro. Is it possible for you to
> share a full program I can compile and run, plesse?
Here is the test sans the connection setup:
-----------------------------------------------------------------------
#include <libpq-fe.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <stddef.h>
#include <assert.h>
#include <sys/select.h>
// Note: hack.
//
#include <arpa/inet.h>
#define htonll(x) ((((long long)htonl(x)) << 32) + htonl((x) >> 32))
static const size_t columns = 3;
struct data
{
long long id;
long long idata;
const char* sdata;
};
static char* values[columns];
static int lengths[columns];
static int formats[columns] = {1, 1, 1};
static const unsigned int types[columns] = {
20, // int8
20, // int8
25 // text
};
static void
init (const struct data* d)
{
values[0] = (char*)&d->id;
lengths[0] = sizeof (d->id);
values[1] = (char*)&d->idata;
lengths[1] = sizeof (d->idata);
values[2] = (char*)d->sdata;
lengths[2] = strlen (d->sdata);
}
static void
execute (PGconn* conn, const struct data* ds, size_t n)
{
int sock = PQsocket (conn);
assert (sock != -1);
if (PQsetnonblocking (conn, 1) == -1 ||
PQenterPipelineMode (conn) == 0)
assert (false);
// True if we've written and read everything, respectively.
//
bool wdone = false;
bool rdone = false;
size_t wn = 0;
size_t rn = 0;
while (!rdone)
{
fd_set wds;
if (!wdone)
{
FD_ZERO (&wds);
FD_SET (sock, &wds);
}
fd_set rds;
FD_ZERO (&rds);
FD_SET (sock, &rds);
if (select (sock + 1, &rds, wdone ? NULL : &wds, NULL, NULL) == -1)
{
if (errno == EINTR)
continue;
assert (false);
}
// Try to minimize the chance of blocking the server by first processing
// the result and then sending more queries.
//
if (FD_ISSET (sock, &rds))
{
if (PQconsumeInput (conn) == 0)
assert (false);
while (PQisBusy (conn) == 0)
{
//fprintf (stderr, "PQgetResult %zu\n", rn);
PGresult* res = PQgetResult (conn);
assert (res != NULL);
ExecStatusType stat = PQresultStatus (res);
if (stat == PGRES_PIPELINE_SYNC)
{
assert (wdone && rn == n);
PQclear (res);
rdone = true;
break;
}
if (stat == PGRES_FATAL_ERROR)
{
const char* s = PQresultErrorField (res, PG_DIAG_SQLSTATE);
if (strcmp (s, "23505") == 0)
fprintf (stderr, "duplicate id at %zu\n", rn);
}
PQclear (res);
assert (rn != n);
++rn;
// We get a NULL result after each query result.
//
{
PGresult* end = PQgetResult (conn);
assert (end == NULL);
}
}
}
if (!wdone && FD_ISSET (sock, &wds))
{
// Send queries until we get blocked (write-biased). This feels like
// a better overall strategy to keep the server busy compared to
// sending one query at a time and then re-checking if there is
// anything to read because the results of INSERT/UPDATE/DELETE are
// presumably small and quite a few of them can get buffered before
// the server gets blocked.
//
for (;;)
{
if (wn != n)
{
//fprintf (stderr, "PQsendQueryPrepared %zu\n", wn);
init (ds + wn);
if (PQsendQueryPrepared (conn,
"persist_object",
(int)(columns),
values,
lengths,
formats,
1) == 0)
assert (false);
if (++wn == n)
{
if (PQpipelineSync (conn) == 0)
assert (false);
}
}
// PQflush() result:
//
// 0 -- success (queue is now empty)
// 1 -- blocked
// -1 -- error
//
int r = PQflush (conn);
assert (r != -1);
if (r == 0)
{
if (wn != n)
{
// If we continue here, then we are write-biased. And if we
// break, then we are read-biased.
//
#if 1
break;
#else
continue;
#endif
}
wdone = true;
}
break; // Blocked or done.
}
}
}
if (PQexitPipelineMode (conn) == 0 ||
PQsetnonblocking (conn, 0) == -1)
assert (false);
}
static void
test (PGconn* conn)
{
const size_t batch = 500;
struct data ds[batch];
for (size_t i = 0; i != batch; ++i)
{
ds[i].id = htonll (i == batch / 2 ? i - 1 : i); // Cause duplicate PK.
ds[i].idata = htonll (i);
ds[i].sdata = "abc";
}
// Prepare the statement.
//
{
PGresult* res = PQprepare (
conn,
"persist_object",
"INSERT INTO \"pgsql_bulk_object\" "
"(\"id\", "
"\"idata\", "
"\"sdata\") "
"VALUES "
"($1, $2, $3)",
(int)(columns),
types);
assert (PQresultStatus (res) == PGRES_COMMAND_OK);
PQclear (res);
}
// Begin transaction.
//
{
PGresult* res = PQexec (conn, "begin");
assert (PQresultStatus (res) == PGRES_COMMAND_OK);
PQclear (res);
}
execute (conn, ds, batch);
// Commit transaction.
//
{
PGresult* res = PQexec (conn, "commit");
assert (PQresultStatus (res) == PGRES_COMMAND_OK);
PQclear (res);
}
}
-----------------------------------------------------------------------
Use the following statements to (re)create the table:
DROP TABLE IF EXISTS "pgsql_bulk_object" CASCADE;
CREATE TABLE "pgsql_bulk_object" (
"id" BIGINT NOT NULL PRIMARY KEY,
"idata" BIGINT NOT NULL,
"sdata" TEXT NOT NULL);
It fails consistently for me when running against the local PostgreSQL
9.5 server (connecting via the UNIX socket):
duplicate id at 250
driver: driver.cxx:105: void execute(PGconn*, const data*, size_t): Assertion `res != NULL' failed.
From | Date | Subject | |
---|---|---|---|
Next Message | Tom Lane | 2021-07-07 15:14:53 | Re: PostgreSQL-13.3 parser.y with positional references by named references |
Previous Message | Jacob Champion | 2021-07-07 15:07:14 | Re: [PATCH] Pull general SASL framework out of SCRAM |