Revised nonblocking patches + quasi docs

From: Alfred Perlstein <bright(at)wintelcom(dot)net>
To: Tom Lane <tgl(at)sss(dot)pgh(dot)pa(dot)us>
Cc: Don Baccus <dhogaza(at)pacifier(dot)com>, pgsql-hackers(at)postgreSQL(dot)org, scrappy(at)hub(dot)org
Subject: Revised nonblocking patches + quasi docs
Date: 2000-01-14 19:14:30
Message-ID: 20000114111430.A824@fw.wintelcom.net
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

* Tom Lane <tgl(at)sss(dot)pgh(dot)pa(dot)us> [000109 08:18] wrote:
> Don Baccus <dhogaza(at)pacifier(dot)com> writes:
> > At 05:27 PM 1/8/00 -0500, Tom Lane wrote:
> >> I also object strongly to the lack of documentation.
>
> > ... I know there are some folks who aren't native-english speakers, so
> > perhaps you don't want to require that the implementor of such patches
> > provide the final documentation wording. But the information should
> > be there and spelled out in a form that can be very easily moved to
> > the docs.
>
> Oh, absolutely. Thomas, our master of the docs, has always had the
> policy of "give me some words, I'll take care of formatting and
> editing..."
>
> I was probably too harsh on Alfred last night, since in fact his code
> was fairly well commented, and some minimal doco could have been
> extracted from the routine headers. But on a change like this, I think
> some paragraphs of coherent high-level explanation are needed: what it
> does, when and why you'd use it. I didn't see that anywhere...

I've actually been trying to work on the sgml and failing miserably,
I have no clue how this stuff works (sgml compilation) are you asking
for a couple of paragraphs that describe the proposed changes?

If so I hope this suffices, if not some help on building the sgml
would be much appreciated:

--------

Summary:

First and foremost, great pains have been taken so that there
are _no_ compatibility issues.

If a 6.5.3 libpq program should not behave any differently
with this patches in place, all they do is offer a step closer
to a truly non-blocking connection to the backend and address
some issues with non-blocking connections.

----

Added functions:

int PQisnonblocking(static PGconn *conn);

returns whether or not the socket is in blocking mode, however...
it doesn't actually check the socket flags, it relies on the user
to call 'PQsetnonblocking()' to keep the internal state of libpq
sane. users should no longer use 'PQsocket()' to retrieve the
socket and 'manually' ioctl/fcntl it to non-blocking

returns TRUE if the socket has been set to blocking more, FALSE
if the socket is blocking

int PQflush(PGconn *conn);

flush the send-queue to the backend, just make this visible to the
user for convience, as the internal function works, 0 for success,
EOF for any failure.

int PQsetnonblocking(PGconn *conn, int arg);

actually set the connection to the backend to blocking or
non-blocking arg should be set to TRUE to set the connection to
non-blocking or FALSE to set it blocking.

there's an implied blocking flush of the send-queue which is
really ok as the user is either 'going into' or 'returning from'
a blocking state

returns 0 for success, -1 for failure

---

New functionality:

PQsetblocking() allows libpq to know what behavior the user really
wants, the user will not block sending data to the backend,
potentially if i had a constant stream of data and was doing a
COPYIN it'd never finish because unless the backend lost the
connection I would block while sending until the backend can take
more data.

---

Implementation changes:

none should be visible to programs based on 6.5.3's libpq.

programs based on later versions of libpq will notice that
the non-blocking connection functions will set the state of
the connection to non-blocking automatically.

when the connection is set non-blocking pqFlush() will not block
if the sendqueue would be filled by new data inserted into the
the queue.

functions that poll for data from the backend implicitly _try_
flush the send queue if set to non-blocking. This allows the
polling to act as a context for pushing queued data to the backend.

---

Problems:

We need some sort of send-queue commit reservations so that
there's no chance of us sending a partial queury down the pipe
to the backend, right now this is hacked around by potentially
blocking in non-blocking mode if something 'goes terribly wrong'
I plan to fix this.

---

Quirks:

PQexec() assumes the caller wants blocking behavior and will set the
connection to blocking for the duration of the PQexec() call, it will
then restore it

---

Internal changes:

new field in PGconn 'int nonblocking' set to 1 if the connection is
nonblocking, 0 if blocking (default)

macro pqIsnonblocking(PGconn) to avoid a function call to check blocking
status (only visible if libpq-int.h is included)

the internal function connectMakeNonblocking() has been replaced with
PQsetblocking()

restart a send if EINTR is reported during a flush.

---

Lastly:

This is work in progress, I will be working towards making libpq
better at not blocking.

here are the diffs:

Index: fe-connect.c
===================================================================
RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-connect.c,v
retrieving revision 1.109
diff -u -c -IHeader -I$Id: -r1.109 fe-connect.c
cvs diff: conflicting specifications of output style
*** fe-connect.c 2000/01/14 05:33:15 1.109
--- fe-connect.c 2000/01/14 18:36:54
***************
*** 594,624 ****
return 0;
}

-
- /* ----------
- * connectMakeNonblocking -
- * Make a connection non-blocking.
- * Returns 1 if successful, 0 if not.
- * ----------
- */
- static int
- connectMakeNonblocking(PGconn *conn)
- {
- #ifndef WIN32
- if (fcntl(conn->sock, F_SETFL, O_NONBLOCK) < 0)
- #else
- if (ioctlsocket(conn->sock, FIONBIO, &on) != 0)
- #endif
- {
- printfPQExpBuffer(&conn->errorMessage,
- "connectMakeNonblocking -- fcntl() failed: errno=%d\n%s\n",
- errno, strerror(errno));
- return 0;
- }
-
- return 1;
- }
-
/* ----------
* connectNoDelay -
* Sets the TCP_NODELAY socket option.
--- 594,599 ----
***************
*** 789,795 ****
* Ewan Mellor <eem21(at)cam(dot)ac(dot)uk>.
* ---------- */
#if (!defined(WIN32) || defined(WIN32_NON_BLOCKING_CONNECTIONS)) && !defined(USE_SSL)
! if (!connectMakeNonblocking(conn))
goto connect_errReturn;
#endif

--- 764,770 ----
* Ewan Mellor <eem21(at)cam(dot)ac(dot)uk>.
* ---------- */
#if (!defined(WIN32) || defined(WIN32_NON_BLOCKING_CONNECTIONS)) && !defined(USE_SSL)
! if (PQsetnonblocking(conn, TRUE) != 0)
goto connect_errReturn;
#endif

***************
*** 898,904 ****
/* This makes the connection non-blocking, for all those cases which forced us
not to do it above. */
#if (defined(WIN32) && !defined(WIN32_NON_BLOCKING_CONNECTIONS)) || defined(USE_SSL)
! if (!connectMakeNonblocking(conn))
goto connect_errReturn;
#endif

--- 873,879 ----
/* This makes the connection non-blocking, for all those cases which forced us
not to do it above. */
#if (defined(WIN32) && !defined(WIN32_NON_BLOCKING_CONNECTIONS)) || defined(USE_SSL)
! if (PQsetnonblocking(conn, TRUE) != 0)
goto connect_errReturn;
#endif

***************
*** 1702,1707 ****
--- 1677,1683 ----
conn->inBuffer = (char *) malloc(conn->inBufSize);
conn->outBufSize = 8 * 1024;
conn->outBuffer = (char *) malloc(conn->outBufSize);
+ conn->nonblocking = FALSE;
initPQExpBuffer(&conn->errorMessage);
initPQExpBuffer(&conn->workBuffer);
if (conn->inBuffer == NULL ||
***************
*** 1812,1817 ****
--- 1788,1794 ----
conn->lobjfuncs = NULL;
conn->inStart = conn->inCursor = conn->inEnd = 0;
conn->outCount = 0;
+ conn->nonblocking = FALSE;

}

Index: fe-exec.c
===================================================================
RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-exec.c,v
retrieving revision 1.86
diff -u -c -IHeader -I$Id: -r1.86 fe-exec.c
cvs diff: conflicting specifications of output style
*** fe-exec.c 1999/11/11 00:10:14 1.86
--- fe-exec.c 2000/01/14 22:47:07
***************
*** 13,18 ****
--- 13,19 ----
*/
#include <errno.h>
#include <ctype.h>
+ #include <fcntl.h>

#include "postgres.h"
#include "libpq-fe.h"
***************
*** 24,30 ****
#include <unistd.h>
#endif

-
/* keep this in same order as ExecStatusType in libpq-fe.h */
const char *const pgresStatus[] = {
"PGRES_EMPTY_QUERY",
--- 25,30 ----
***************
*** 514,526 ****
conn->curTuple = NULL;

/* send the query to the backend; */
! /* the frontend-backend protocol uses 'Q' to designate queries */
! if (pqPutnchar("Q", 1, conn) ||
! pqPuts(query, conn) ||
! pqFlush(conn))
{
! handleSendFailure(conn);
! return 0;
}

/* OK, it's launched! */
--- 514,566 ----
conn->curTuple = NULL;

/* send the query to the backend; */
!
! /*
! * in order to guarantee that we don't send a partial query
! * where we would become out of sync with the backend and/or
! * block during a non-blocking connection we must first flush
! * the send buffer before sending more data
! *
! * an alternative is to implement 'queue reservations' where
! * we are able to roll up a transaction
! * (the 'Q' along with our query) and make sure we have
! * enough space for it all in the send buffer.
! */
! if (pqIsnonblocking(conn))
{
! /*
! * the buffer must have emptied completely before we allow
! * a new query to be buffered
! */
! if (pqFlush(conn))
! return 0;
! /* 'Q' == queries */
! /* XXX: if we fail here we really ought to not block */
! if (pqPutnchar("Q", 1, conn) ||
! pqPuts(query, conn))
! {
! handleSendFailure(conn);
! return 0;
! }
! /*
! * give the data a push, ignore the return value as
! * ConsumeInput() will do any aditional flushing if needed
! */
! (void) pqFlush(conn);
! }
! else
! {
! /*
! * the frontend-backend protocol uses 'Q' to
! * designate queries
! */
! if (pqPutnchar("Q", 1, conn) ||
! pqPuts(query, conn) ||
! pqFlush(conn))
! {
! handleSendFailure(conn);
! return 0;
! }
}

/* OK, it's launched! */
***************
*** 574,580 ****
--- 614,630 ----
* we will NOT block waiting for more input.
*/
if (pqReadData(conn) < 0)
+ {
+ /*
+ * for non-blocking connections
+ * try to flush the send-queue otherwise we may never get a
+ * responce for something that may not have already been sent
+ * because it's in our write buffer!
+ */
+ if (pqIsnonblocking(conn))
+ (void) pqFlush(conn);
return 0;
+ }
/* Parsing of the data waits till later. */
return 1;
}
***************
*** 1088,1093 ****
--- 1138,1153 ----
{
PGresult *result;
PGresult *lastResult;
+ bool savedblocking;
+
+ /*
+ * we assume anyone calling PQexec wants blocking behaviour,
+ * we force the blocking status of the connection to blocking
+ * for the duration of this function and restore it on return
+ */
+ savedblocking = pqIsnonblocking(conn);
+ if (PQsetnonblocking(conn, FALSE) == -1)
+ return NULL;

/*
* Silently discard any prior query result that application didn't
***************
*** 1102,1115 ****
PQclear(result);
printfPQExpBuffer(&conn->errorMessage,
"PQexec: you gotta get out of a COPY state yourself.\n");
! return NULL;
}
PQclear(result);
}

/* OK to send the message */
if (!PQsendQuery(conn, query))
! return NULL;

/*
* For backwards compatibility, return the last result if there are
--- 1162,1176 ----
PQclear(result);
printfPQExpBuffer(&conn->errorMessage,
"PQexec: you gotta get out of a COPY state yourself.\n");
! /* restore blocking status */
! goto errout;
}
PQclear(result);
}

/* OK to send the message */
if (!PQsendQuery(conn, query))
! goto errout; /* restore blocking status */

/*
* For backwards compatibility, return the last result if there are
***************
*** 1142,1148 ****
--- 1203,1217 ----
result->resultStatus == PGRES_COPY_OUT)
break;
}
+
+ if (PQsetnonblocking(conn, savedblocking) == -1)
+ return NULL;
return lastResult;
+
+ errout:
+ if (PQsetnonblocking(conn, savedblocking) == -1)
+ return NULL;
+ return NULL;
}


***************
*** 1431,1438 ****
"PQendcopy() -- I don't think there's a copy in progress.\n");
return 1;
}

! (void) pqFlush(conn); /* make sure no data is waiting to be sent */

/* Return to active duty */
conn->asyncStatus = PGASYNC_BUSY;
--- 1500,1516 ----
"PQendcopy() -- I don't think there's a copy in progress.\n");
return 1;
}
+
+ /*
+ * make sure no data is waiting to be sent,
+ * abort if we are non-blocking and the flush fails
+ */
+ if (pqFlush(conn) && pqIsnonblocking(conn))
+ return (1);

! /* non blocking connections may have to abort at this point. */
! if (pqIsnonblocking(conn) && PQisBusy(conn))
! return (1);

/* Return to active duty */
conn->asyncStatus = PGASYNC_BUSY;
***************
*** 2025,2028 ****
--- 2103,2192 ----
return 1;
else
return 0;
+ }
+
+ /* PQsetnonblocking:
+ sets the PGconn's database connection non-blocking if the arg is TRUE
+ or makes it non-blocking if the arg is FALSE, this will not protect
+ you from PQexec(), you'll only be safe when using the non-blocking
+ API
+ Needs to be called only on a connected database connection.
+ */
+
+ int
+ PQsetnonblocking(PGconn *conn, int arg)
+ {
+ int fcntlarg;
+
+ arg = (arg == TRUE) ? 1 : 0;
+ /* early out if the socket is already in the state requested */
+ if (arg == conn->nonblocking)
+ return (0);
+
+ /*
+ * to guarantee constancy for flushing/query/result-polling behavior
+ * we need to flush the send queue at this point in order to guarantee
+ * proper behavior.
+ * this is ok because either they are making a transition
+ * _from_ or _to_ blocking mode, either way we can block them.
+ */
+ /* if we are going from blocking to non-blocking flush here */
+ if (!pqIsnonblocking(conn) && pqFlush(conn))
+ return (-1);
+
+
+ #ifdef USE_SSL
+ if (conn->ssl)
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ "PQsetnonblocking() -- not supported when using SSL\n");
+ return (-1);
+ }
+ #endif /* USE_SSL */
+
+ #ifndef WIN32
+ fcntlarg = fcntl(conn->sock, F_GETFL, 0);
+ if (fcntlarg == -1)
+ return (-1);
+
+ if ((arg == TRUE &&
+ fcntl(conn->sock, F_SETFL, fcntlarg | O_NONBLOCK) == -1) ||
+ (arg == FALSE &&
+ fcntl(conn->sock, F_SETFL, fcntlarg & ~O_NONBLOCK) == -1))
+ #else
+ fcntlarg = arg;
+ if (ioctlsocket(conn->sock, FIONBIO, &fcntlarg) != 0)
+ #endif
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ "PQsetblocking() -- unable to set nonblocking status to %s\n",
+ arg == TRUE ? "TRUE" : "FALSE");
+ return (-1);
+ }
+
+ conn->nonblocking = arg;
+
+ /* if we are going from non-blocking to blocking flush here */
+ if (pqIsnonblocking(conn) && pqFlush(conn))
+ return (-1);
+
+ return (0);
+ }
+
+ /* return the blocking status of the database connection, TRUE == nonblocking,
+ FALSE == blocking
+ */
+ int
+ PQisnonblocking(const PGconn *conn)
+ {
+
+ return (pqIsnonblocking(conn));
+ }
+
+ /* try to force data out, really only useful for non-blocking users */
+ int
+ PQflush(PGconn *conn)
+ {
+
+ return (pqFlush(conn));
}
Index: fe-misc.c
===================================================================
RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/fe-misc.c,v
retrieving revision 1.33
diff -u -c -IHeader -I$Id: -r1.33 fe-misc.c
cvs diff: conflicting specifications of output style
*** fe-misc.c 1999/11/30 03:08:19 1.33
--- fe-misc.c 2000/01/12 03:12:14
***************
*** 86,91 ****
--- 86,122 ----
{
size_t avail = Max(conn->outBufSize - conn->outCount, 0);

+ /*
+ * if we are non-blocking and the send queue is too full to buffer this
+ * request then try to flush some and return an error
+ */
+ if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
+ {
+ /*
+ * even if the flush failed we may still have written some
+ * data, recalculate the size of the send-queue relative
+ * to the amount we have to send, we may be able to queue it
+ * afterall even though it's not sent to the database it's
+ * ok, any routines that check the data coming from the
+ * database better call pqFlush() anyway.
+ */
+ if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
+ {
+ printfPQExpBuffer(&conn->errorMessage,
+ "pqPutBytes -- pqFlush couldn't flush enough"
+ " data: space available: %d, space needed %d\n",
+ Max(conn->outBufSize - conn->outCount, 0), nbytes);
+ return EOF;
+ }
+ }
+
+ /*
+ * is the amount of data to be sent is larger than the size of the
+ * output buffer then we must flush it to make more room.
+ *
+ * the code above will make sure the loop conditional is never
+ * true for non-blocking connections
+ */
while (nbytes > avail)
{
memcpy(conn->outBuffer + conn->outCount, s, avail);
***************
*** 548,553 ****
--- 579,592 ----
return EOF;
}

+ /*
+ * don't try to send zero data, allows us to use this function
+ * without too much worry about overhead
+ */
+ if (len == 0)
+ return (0);
+
+ /* while there's still data to send */
while (len > 0)
{
/* Prevent being SIGPIPEd if backend has closed the connection. */
***************
*** 556,561 ****
--- 595,601 ----
#endif

int sent;
+
#ifdef USE_SSL
if (conn->ssl)
sent = SSL_write(conn->ssl, ptr, len);
***************
*** 585,590 ****
--- 625,632 ----
case EWOULDBLOCK:
break;
#endif
+ case EINTR:
+ continue;

case EPIPE:
#ifdef ECONNRESET
***************
*** 616,628 ****
ptr += sent;
len -= sent;
}
if (len > 0)
{
/* We didn't send it all, wait till we can send more */

- /* At first glance this looks as though it should block. I think
- * that it will be OK though, as long as the socket is
- * non-blocking. */
if (pqWait(FALSE, TRUE, conn))
return EOF;
}
--- 658,688 ----
ptr += sent;
len -= sent;
}
+
if (len > 0)
{
/* We didn't send it all, wait till we can send more */
+
+ /*
+ * if the socket is in non-blocking mode we may need
+ * to abort here
+ */
+ #ifdef USE_SSL
+ /* can't do anything for our SSL users yet */
+ if (conn->ssl == NULL)
+ {
+ #endif
+ if (pqIsnonblocking(conn))
+ {
+ /* shift the contents of the buffer */
+ memmove(conn->outBuffer, ptr, len);
+ conn->outCount = len;
+ return EOF;
+ }
+ #ifdef USE_SSL
+ }
+ #endif

if (pqWait(FALSE, TRUE, conn))
return EOF;
}
Index: libpq-fe.h
===================================================================
RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/libpq-fe.h,v
retrieving revision 1.54
diff -u -c -IHeader -I$Id: -r1.54 libpq-fe.h
cvs diff: conflicting specifications of output style
*** libpq-fe.h 2000/01/14 05:33:15 1.54
--- libpq-fe.h 2000/01/14 22:45:33
***************
*** 261,266 ****
--- 261,273 ----
extern int PQgetlineAsync(PGconn *conn, char *buffer, int bufsize);
extern int PQputnbytes(PGconn *conn, const char *buffer, int nbytes);
extern int PQendcopy(PGconn *conn);
+
+ /* Set blocking/nonblocking connection to the backend */
+ extern int PQsetnonblocking(PGconn *conn, int arg);
+ extern int PQisnonblocking(const PGconn *conn);
+
+ /* Force the write buffer to be written (or at least try) */
+ extern int PQflush(PGconn *conn);

/*
* "Fast path" interface --- not really recommended for application
Index: libpq-int.h
===================================================================
RCS file: /home/pgcvs/pgsql/src/interfaces/libpq/libpq-int.h,v
retrieving revision 1.15
diff -u -c -IHeader -I$Id: -r1.15 libpq-int.h
cvs diff: conflicting specifications of output style
*** libpq-int.h 2000/01/14 05:33:15 1.15
--- libpq-int.h 2000/01/14 18:32:51
***************
*** 214,219 ****
--- 214,222 ----
int inEnd; /* offset to first position after avail
* data */

+ int nonblocking; /* whether this connection is using a blocking
+ * socket to the backend or not */
+
/* Buffer for data not yet sent to backend */
char *outBuffer; /* currently allocated buffer */
int outBufSize; /* allocated size of buffer */
***************
*** 297,301 ****
--- 300,310 ----
#define strerror(A) (sys_errlist[(A)])
#endif /* sunos4 */
#endif /* !strerror */
+
+ /*
+ * this is so that we can check is a connection is non-blocking internally
+ * without the overhead of a function call
+ */
+ #define pqIsnonblocking(conn) (conn->nonblocking)

#endif /* LIBPQ_INT_H */

on a side note miscadmin.h causes problems on FreeBSD because it uses
pid_t without having included sys/types.h

thanks!

--
-Alfred Perlstein - [bright(at)wintelcom(dot)net|alfred(at)freebsd(dot)org]

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Stephen Birch 2000-01-14 19:20:30 Re: [HACKERS] Uninstalling PostgreSQL ??!!
Previous Message Don Baccus 2000-01-14 16:37:22 Re: [HACKERS] [hackers]development suggestion needed