Re: Fix for non-blocking connections in libpq

From: Bernhard Herzog <bh(at)intevation(dot)de>
To: Bruce Momjian <pgman(at)candle(dot)pha(dot)pa(dot)us>
Cc: pgsql-patches(at)postgresql(dot)org
Subject: Re: Fix for non-blocking connections in libpq
Date: 2002-02-25 11:15:26
Message-ID: 6qelj9bynl.fsf@abnoba.intevation.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-patches

Bruce Momjian <pgman(at)candle(dot)pha(dot)pa(dot)us> writes:

> Bernard, just checking. Is this the most recent version of your patch?

In principle, yes. However, I've ported it to the CVS version in the
meantime. Here's a patch against current CVS HEAD:

Index: src/interfaces/libpq/fe-exec.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-exec.c,v
retrieving revision 1.113
diff -c -r1.113 fe-exec.c
*** src/interfaces/libpq/fe-exec.c 2001/10/25 05:50:13 1.113
--- src/interfaces/libpq/fe-exec.c 2002/02/25 10:21:06
***************
*** 2340,2342 ****
--- 2340,2350 ----

return (pqFlush(conn));
}
+
+ /* try to force data out, really only useful for non-blocking users.
+ * This implementation actually works for non-blocking connections */
+ int
+ PQsendSome(PGconn *conn)
+ {
+ return pqSendSome(conn);
+ }
Index: src/interfaces/libpq/fe-misc.c
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/fe-misc.c,v
retrieving revision 1.65
diff -c -r1.65 fe-misc.c
*** src/interfaces/libpq/fe-misc.c 2001/12/03 00:28:24 1.65
--- src/interfaces/libpq/fe-misc.c 2002/02/25 10:21:06
***************
*** 110,164 ****
static int
pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
{
! size_t avail = Max(conn->outBufSize - conn->outCount, 0);
!
! /*
! * if we are non-blocking and the send queue is too full to buffer
! * this request then try to flush some and return an error
*/
! if (pqIsnonblocking(conn) && nbytes > avail && pqFlush(conn))
{
! /*
! * even if the flush failed we may still have written some data,
! * recalculate the size of the send-queue relative to the amount
! * we have to send, we may be able to queue it afterall even
! * though it's not sent to the database it's ok, any routines that
! * check the data coming from the database better call pqFlush()
! * anyway.
! */
! if (nbytes > Max(conn->outBufSize - conn->outCount, 0))
! {
! printfPQExpBuffer(&conn->errorMessage,
! libpq_gettext("could not flush enough data (space available: %d, space needed %d)\n"),
! (int) Max(conn->outBufSize - conn->outCount, 0),
! (int) nbytes);
! return EOF;
! }
! /* fixup avail for while loop */
avail = Max(conn->outBufSize - conn->outCount, 0);
! }

! /*
! * is the amount of data to be sent is larger than the size of the
! * output buffer then we must flush it to make more room.
! *
! * the code above will make sure the loop conditional is never true for
! * non-blocking connections
! */
! while (nbytes > avail)
! {
! memcpy(conn->outBuffer + conn->outCount, s, avail);
! conn->outCount += avail;
! s += avail;
! nbytes -= avail;
! if (pqFlush(conn))
! return EOF;
! avail = conn->outBufSize;
! }

! memcpy(conn->outBuffer + conn->outCount, s, nbytes);
! conn->outCount += nbytes;

return 0;
}

--- 110,184 ----
static int
pqPutBytes(const char *s, size_t nbytes, PGconn *conn)
{
! /* Strategy to handle blocking and non-blocking connections: Fill
! * the output buffer and flush it repeatedly until either all data
! * has been sent or is at least queued in the buffer.
! *
! * For non-blocking connections, grow the buffer if not all data
! * fits into it and the buffer can't be sent because the socket
! * would block.
*/
!
! while (nbytes)
{
! size_t avail, remaining;
!
! /* fill the output buffer */
avail = Max(conn->outBufSize - conn->outCount, 0);
! remaining = Min(avail, nbytes);
! memcpy(conn->outBuffer + conn->outCount, s, remaining);
! conn->outCount += remaining;
! s += remaining;
! nbytes -= remaining;
!
! /* if the data didn't fit completely into the buffer, try to
! * flush the buffer */
! if (nbytes)
! {
! int send_result = pqSendSome(conn);

! /* if there were errors, report them */
! if (send_result < 0)
! return EOF;

! /* if not all data could be sent, increase the output
! * buffer, put the rest of s into it and return
! * successfully. This case will only happen in a
! * non-blocking connection
! */
! if (send_result > 0)
! {
! /* try to grow the buffer.
! * FIXME: The new size could be chosen more
! * intelligently.
! */
! size_t buflen = conn->outCount + nbytes;
! if (buflen > conn->outBufSize)
! {
! char * newbuf = realloc(conn->outBuffer, buflen);
! if (!newbuf)
! {
! /* realloc failed. Probably out of memory */
! printfPQExpBuffer(&conn->errorMessage,
! "cannot allocate memory for output buffer\n");
! return EOF;
! }
! conn->outBuffer = newbuf;
! conn->outBufSize = buflen;
! }
! /* put the data into it */
! memcpy(conn->outBuffer + conn->outCount, s, nbytes);
! conn->outCount += nbytes;

+ /* report success. */
+ return 0;
+ }
+ }
+
+ /* pqSendSome was able to send all data. Continue with the next
+ * chunk of s. */
+ } /* while */
+
return 0;
}

***************
*** 604,613 ****
}

/*
! * pqFlush: send any data waiting in the output buffer
*/
int
! pqFlush(PGconn *conn)
{
char *ptr = conn->outBuffer;
int len = conn->outCount;
--- 624,636 ----
}

/*
! * pqSendSome: send any data waiting in the output buffer.
! *
! * Return 0 on sucess, -1 on failure and 1 when data remains because the
! * socket would block and the connection is non-blocking.
*/
int
! pqSendSome(PGconn *conn)
{
char *ptr = conn->outBuffer;
int len = conn->outCount;
***************
*** 616,622 ****
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("connection not open\n"));
! return EOF;
}

/*
--- 639,645 ----
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("connection not open\n"));
! return -1;
}

/*
***************
*** 674,680 ****
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext(
"server closed the connection unexpectedly\n"
! "\tThis probably means the server terminated abnormally\n"
"\tbefore or while processing the request.\n"));

/*
--- 697,703 ----
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext(
"server closed the connection unexpectedly\n"
! "\tThis probably means the server terminated abnormally\n"
"\tbefore or while processing the request.\n"));

/*
***************
*** 685,698 ****
* the socket open until pqReadData finds no more data
* can be read.
*/
! return EOF;

default:
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("could not send data to server: %s\n"),
SOCK_STRERROR(SOCK_ERRNO));
/* We don't assume it's a fatal error... */
! return EOF;
}
}
else
--- 708,721 ----
* the socket open until pqReadData finds no more data
* can be read.
*/
! return -1;

default:
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("could not send data to server: %s\n"),
SOCK_STRERROR(SOCK_ERRNO));
/* We don't assume it's a fatal error... */
! return -1;
}
}
else
***************
*** 707,713 ****

/*
* if the socket is in non-blocking mode we may need to abort
! * here
*/
#ifdef USE_SSL
/* can't do anything for our SSL users yet */
--- 730,736 ----

/*
* if the socket is in non-blocking mode we may need to abort
! * here and return 1 to indicate that data is still pending.
*/
#ifdef USE_SSL
/* can't do anything for our SSL users yet */
***************
*** 719,732 ****
/* shift the contents of the buffer */
memmove(conn->outBuffer, ptr, len);
conn->outCount = len;
! return EOF;
}
#ifdef USE_SSL
}
#endif

if (pqWait(FALSE, TRUE, conn))
! return EOF;
}
}

--- 742,755 ----
/* shift the contents of the buffer */
memmove(conn->outBuffer, ptr, len);
conn->outCount = len;
! return 1;
}
#ifdef USE_SSL
}
#endif

if (pqWait(FALSE, TRUE, conn))
! return -1;
}
}

***************
*** 735,740 ****
--- 758,783 ----
if (conn->Pfdebug)
fflush(conn->Pfdebug);

+ return 0;
+ }
+
+
+
+ /*
+ * pqFlush: send any data waiting in the output buffer
+ *
+ * Implemented in terms of pqSendSome to recreate the old behavior which
+ * returned 0 if all data was sent or EOF. EOF was sent regardless of
+ * whether an error occurred or not all data was sent on a non-blocking
+ * socket.
+ */
+ int
+ pqFlush(PGconn *conn)
+ {
+ if (pqSendSome(conn))
+ {
+ return EOF;
+ }
return 0;
}

Index: src/interfaces/libpq/libpq-fe.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-fe.h,v
retrieving revision 1.80
diff -c -r1.80 libpq-fe.h
*** src/interfaces/libpq/libpq-fe.h 2001/11/08 20:37:52 1.80
--- src/interfaces/libpq/libpq-fe.h 2002/02/25 10:21:06
***************
*** 279,284 ****
--- 279,285 ----

/* Force the write buffer to be written (or at least try) */
extern int PQflush(PGconn *conn);
+ extern int PQsendSome(PGconn *conn);

/*
* "Fast path" interface --- not really recommended for application
Index: src/interfaces/libpq/libpq-int.h
===================================================================
RCS file: /projects/cvsroot/pgsql/src/interfaces/libpq/libpq-int.h,v
retrieving revision 1.44
diff -c -r1.44 libpq-int.h
*** src/interfaces/libpq/libpq-int.h 2001/11/05 17:46:38 1.44
--- src/interfaces/libpq/libpq-int.h 2002/02/25 10:21:06
***************
*** 323,328 ****
--- 323,329 ----
extern int pqPutInt(int value, size_t bytes, PGconn *conn);
extern int pqReadData(PGconn *conn);
extern int pqFlush(PGconn *conn);
+ extern int pqSendSome(PGconn *conn);
extern int pqWait(int forRead, int forWrite, PGconn *conn);
extern int pqReadReady(PGconn *conn);
extern int pqWriteReady(PGconn *conn);

Bernhard

--
Intevation GmbH http://intevation.de/
Sketch http://sketch.sourceforge.net/
MapIt! http://mapit.de/

In response to

Responses

Browse pgsql-patches by date

  From Date Subject
Next Message Fernando Nasser 2002-02-25 14:46:05 Re: [HACKERS] Updated TODO item
Previous Message Janardhana Reddy 2002-02-25 07:11:32 Re: [PATCHES] WAL Performance Improvements