Index: build.xml
===================================================================
RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/build.xml,v
retrieving revision 1.47
diff -u -r1.47 build.xml
--- build.xml 8 Jun 2004 00:01:47 -0000 1.47
+++ build.xml 24 Jun 2004 09:26:26 -0000
@@ -119,10 +119,11 @@
-
+
+
Index: org/postgresql/PGConnection.java
===================================================================
RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/PGConnection.java,v
retrieving revision 1.8
diff -u -r1.8 PGConnection.java
--- org/postgresql/PGConnection.java 8 Jun 2004 00:01:47 -0000 1.8
+++ org/postgresql/PGConnection.java 24 Jun 2004 09:26:26 -0000
@@ -17,6 +17,7 @@
import java.sql.*;
import org.postgresql.core.Encoding;
+import org.postgresql.copy.CopyManager;
import org.postgresql.fastpath.Fastpath;
import org.postgresql.largeobject.LargeObjectManager;
@@ -30,6 +31,12 @@
*/
public PGNotification[] getNotifications();
+ /**
+ * This returns the COPY API for the current connection.
+ * @since 7.5
+ */
+ public CopyManager getCopyAPI() throws SQLException;
+
/**
* This returns the LargeObject API for the current connection.
* @since 7.3
Index: org/postgresql/errors.properties
===================================================================
RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/errors.properties,v
retrieving revision 1.34
diff -u -r1.34 errors.properties
--- org/postgresql/errors.properties 21 Jun 2004 03:09:44 -0000 1.34
+++ org/postgresql/errors.properties 24 Jun 2004 09:26:26 -0000
@@ -31,6 +31,10 @@
postgresql.con.tuple:Tuple received before MetaData.
postgresql.con.type:Unknown Response Type {0}
postgresql.con.user:The user property is missing. It is mandatory.
+postgresql.copy.ioerror:An IO error occurred while sending to the backend during COPY - {0}
+postgresql.copy.inputsource:An IO error occured while reading from a COPY input source - {0}
+postgresql.copy.outputsource:An IO error occured while writing to a COPY output source - {0}
+postgresql.copy.type:Copy unexpected response type {0}
postgresql.error.exception:Exception: {0}
postgresql.error.stacktrace:Stack Trace:
postgresql.error.stacktraceend:End of Stack Trace
Index: org/postgresql/jdbc1/AbstractJdbc1Connection.java
===================================================================
RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/jdbc1/AbstractJdbc1Connection.java,v
retrieving revision 1.39
diff -u -r1.39 AbstractJdbc1Connection.java
--- org/postgresql/jdbc1/AbstractJdbc1Connection.java 22 Jun 2004 09:36:32 -0000 1.39
+++ org/postgresql/jdbc1/AbstractJdbc1Connection.java 24 Jun 2004 09:26:27 -0000
@@ -29,6 +29,7 @@
import org.postgresql.core.PGStream;
import org.postgresql.core.QueryExecutor;
import org.postgresql.core.StartupPacket;
+import org.postgresql.copy.CopyManager;
import org.postgresql.fastpath.Fastpath;
import org.postgresql.largeobject.LargeObjectManager;
import org.postgresql.util.MD5Digest;
@@ -993,6 +994,17 @@
// This holds a reference to the LargeObject API if already open
private LargeObjectManager largeobject = null;
+ // The copy Manager
+ private CopyManager copyManager = null;
+ public CopyManager getCopyAPI() throws SQLException
+ {
+ if (copyManager == null && getPGProtocolVersionMajor() >= 3)
+ copyManager = new CopyManager(this,pgStream);
+ return copyManager;
+ }
+
+
+
/*
* This method is used internally to return an object based around
* org.postgresql's more unique data types.
Index: org/postgresql/test/jdbc2/Jdbc2TestSuite.java
===================================================================
RCS file: /usr/local/cvsroot/pgjdbc/pgjdbc/org/postgresql/test/jdbc2/Jdbc2TestSuite.java,v
retrieving revision 1.13
diff -u -r1.13 Jdbc2TestSuite.java
--- org/postgresql/test/jdbc2/Jdbc2TestSuite.java 29 Mar 2004 19:17:12 -0000 1.13
+++ org/postgresql/test/jdbc2/Jdbc2TestSuite.java 24 Jun 2004 09:26:27 -0000
@@ -62,7 +62,8 @@
// Fastpath/LargeObject
suite.addTestSuite(BlobTest.class);
suite.addTestSuite(OID74Test.class);
-
+ suite.addTestSuite(CopyTest.class);
+
suite.addTestSuite(UpdateableResultTest.class );
suite.addTestSuite(CallableStmtTest.class );
Index: build.local.properties
===================================================================
RCS file: build.local.properties
diff -N build.local.properties
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ build.local.properties 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,11 @@
+# Default build parameters. These may be overridden by local configuration
+# settings in build.local.properties.
+#
+
+fullversion=7.5develschabi
+server=localhost
+port=5432
+database=jdbcdrivertest
+username=jdbcuser
+password=jdbc
+preparethreshold=42
Index: org/postgresql/copy/CopyManager.java
===================================================================
RCS file: org/postgresql/copy/CopyManager.java
diff -N org/postgresql/copy/CopyManager.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ org/postgresql/copy/CopyManager.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,238 @@
+package org.postgresql.copy;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+
+import java.sql.SQLException;
+
+import org.postgresql.core.BaseConnection;
+import org.postgresql.core.PGStream;
+import org.postgresql.core.Encoding;
+import org.postgresql.core.Notification;
+import org.postgresql.util.PSQLState;
+import org.postgresql.util.PSQLException;
+import org.postgresql.util.PSQLWarning;
+import org.postgresql.util.ServerErrorMessage;
+
+/**
+ * Implement COPY support in the JDBC driver. This requires a 7.4 server and a
+ * connection with the V3 protocol. Previous versions could not recover from
+ * errors and the connection had to be abandoned which was not acceptable.
+ */
+
+public class CopyManager {
+ private BaseConnection pgConn;
+ private PGStream pgStream;
+
+ public CopyManager(BaseConnection pgConn, PGStream pgStream) {
+ this.pgConn = pgConn;
+ this.pgStream = pgStream;
+ }
+
+ /**
+ * Copy data from the InputStream into the given table using the default
+ * copy parameters.
+ */
+ public void copyIn(String table, InputStream is) throws SQLException {
+ copyInQuery("COPY " + table + " FROM STDIN", is);
+ }
+
+ /**
+ * Copy data from the InputStream using the given COPY query. This allows
+ * specification of additional copy parameters such as the delimiter or NULL
+ * marker.
+ */
+ public void copyInQuery(String query, InputStream is) throws SQLException {
+
+ synchronized (pgStream) {
+ sendQuery(query);
+ copyResultLoop(is, null);
+ }
+
+ }
+
+ /**
+ * Copy data from the given table to the OutputStream using the default copy
+ * parameters.
+ */
+ public void copyOut(String table, OutputStream os) throws SQLException {
+ copyOutQuery("COPY " + table + " TO STDOUT", os);
+ }
+
+ /**
+ * Copy data to the OutputStream using the given COPY query. This allows
+ * specification of additional copy parameters such as the delimiter or NULL
+ * marker.
+ */
+ public void copyOutQuery(String query, OutputStream os) throws SQLException {
+ synchronized (pgStream) {
+ sendQuery(query);
+ copyResultLoop(null, os);
+ }
+ }
+
+ /**
+ * After the copy query has been go through the possible responses. The flag
+ * which tells us whether we are doing copy in or out is simply where the
+ * InputStream or OutputStream is null.
+ *
+ * This is much like the loop in QueryExecutor, it could be merged into
+ * that, but it would require some generalization of its current specific
+ * tasks. Right now it has its query in m_binds[] form and expects to return
+ * a ResultSet. A more pluggable network layer would be nice so we could
+ * support the V2 and V3 protocols more cleanly and consider a SPI based
+ * layer for an in server pl/java. In general I think it's a bad idea for
+ * PGStream to be seen anywhere outside of the QueryExecutor.
+ */
+ private void copyResultLoop(InputStream is, OutputStream os) throws SQLException {
+
+ Encoding encoding = pgConn.getEncoding();
+
+ PSQLException topLevelError = null;
+ ServerErrorMessage sem;
+ boolean queryDone = false;
+ while (!queryDone) {
+ int c = pgStream.ReceiveChar();
+
+ switch (c) {
+ case 'A' :// Asynch Notify
+ int pid = pgStream.ReceiveIntegerR(4);
+ String msg = pgStream.ReceiveString(encoding);
+ pgConn.addNotification(new Notification(msg, pid));
+ break;
+ case 'C' :// Command Complete
+ int commandLength = pgStream.ReceiveIntegerR(4);
+ String command = encoding.decode(pgStream.Receive(commandLength - 4 - 1));
+ pgStream.Receive(1);
+ break;
+ case 'E' :// Error Message
+ int errorLength = pgStream.ReceiveIntegerR(4);
+ String errorMessage = encoding.decode(pgStream.Receive(errorLength - 4));
+ sem = new ServerErrorMessage(errorMessage);
+
+ PSQLException error = new PSQLException(sem.toString(), new PSQLState(sem.getSQLState()));
+ if (topLevelError != null) {
+ topLevelError.setNextException(error);
+ } else {
+ topLevelError = error;
+ }
+ break;
+ case 'N' :// Error Notification
+ int notificationLength = pgStream.ReceiveIntegerR(4);
+ String notificationMessage = encoding.decode(pgStream.Receive(notificationLength - 4));
+ sem = new ServerErrorMessage(notificationMessage);
+ PSQLWarning warn = new PSQLWarning(sem);
+ pgConn.addWarning(warn);
+ break;
+ case 'G' :// CopyInResponse
+ if (is == null)
+ throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character(
+ (char) c));
+ receiveCopyInOutResponse();
+ sendCopyData(is);
+ break;
+ case 'H' :// CopyOutResponse
+ if (os == null)
+ throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character(
+ (char) c));
+ receiveCopyInOutResponse();
+ break;
+ case 'd' :// CopyData
+ if (os == null)
+ throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character(
+ (char) c));
+ receiveCopyData(os);
+ break;
+ case 'c' :// CopyDone
+ int copyDoneLength = pgStream.ReceiveIntegerR(4);
+ break;
+ case 'Z' :// ReadyForQuery
+ int messageLength = pgStream.ReceiveIntegerR(4);
+ char messageStatus = (char) pgStream.ReceiveChar();
+ queryDone = true;
+ break;
+ default :
+ throw new PSQLException("postgresql.copy.type", PSQLState.COMMUNICATION_ERROR, new Character((char) c));
+ }
+ }
+
+ if (topLevelError != null)
+ throw topLevelError;
+
+ }
+
+ private void sendQuery(String query) throws SQLException {
+ Encoding encoding = pgConn.getEncoding();
+ try {
+ pgStream.SendChar('Q');
+ byte message[] = encoding.encode(query);
+ int messageSize = 4 + message.length + 1;
+ pgStream.SendInteger(messageSize, 4);
+ pgStream.Send(message);
+ pgStream.SendChar(0);
+ pgStream.flush();
+ } catch (IOException ioe) {
+ throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe);
+ }
+ }
+
+ private void sendCopyData(InputStream is) throws SQLException {
+ byte buf[] = new byte[8192];
+
+ int read = 0;
+
+ while (read >= 0) {
+ try {
+ read = is.read(buf);
+ } catch (IOException ioe) {
+ throw new PSQLException("postgresql.copy.inputsource", PSQLState.DATA_ERROR, ioe);
+ }
+
+ if (read > 0) {
+ try {
+ pgStream.SendChar('d');
+ int messageSize = read + 4;
+ pgStream.SendInteger(messageSize, 4);
+ pgStream.Send(buf, read);
+ } catch (IOException ioe) {
+ throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION,
+ ioe);
+ }
+ }
+ }
+
+ // Send the CopyDone message
+ try {
+ pgStream.SendChar('c');
+ pgStream.SendInteger(4, 4);
+ pgStream.flush();
+ } catch (IOException ioe) {
+ throw new PSQLException("postgresql.copy.ioerror", PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe);
+ }
+ }
+
+ /**
+ * CopyInResponse and CopyOutResponse have the same field layouts and we
+ * simply discard the results.
+ */
+ private void receiveCopyInOutResponse() throws SQLException {
+ int messageLength = pgStream.ReceiveIntegerR(4);
+ int copyFormat = pgStream.ReceiveIntegerR(1);
+
+ int numColumns = pgStream.ReceiveIntegerR(2);
+ for (int i = 0; i < numColumns; i++) {
+ int copyColumnFormat = pgStream.ReceiveIntegerR(2);
+ }
+ }
+
+ private void receiveCopyData(OutputStream os) throws SQLException {
+ int messageLength = pgStream.ReceiveIntegerR(4);
+ byte data[] = pgStream.Receive(messageLength - 4);
+ try {
+ os.write(data);
+ } catch (IOException ioe) {
+ throw new PSQLException("postgresql.copy.outputsource", PSQLState.DATA_ERROR, ioe);
+ }
+ }
+}
Index: org/postgresql/test/jdbc2/CopyTest.java
===================================================================
RCS file: org/postgresql/test/jdbc2/CopyTest.java
diff -N org/postgresql/test/jdbc2/CopyTest.java
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ org/postgresql/test/jdbc2/CopyTest.java 1 Jan 1970 00:00:00 -0000
@@ -0,0 +1,109 @@
+package org.postgresql.test.jdbc2;
+
+import junit.framework.TestCase;
+import org.postgresql.test.TestUtil;
+import org.postgresql.copy.CopyManager;
+import java.io.*;
+import java.sql.*;
+
+public class CopyTest extends TestCase {
+ private Connection conn;
+ private CopyManager copyManager;
+
+ protected void setUp() throws SQLException {
+ conn = TestUtil.openDB();
+ TestUtil.createTable(conn, "copytesttable", "a int, b text, c float, d text");
+ copyManager = ((org.postgresql.PGConnection) conn).getCopyAPI();
+ }
+
+ protected void tearDown() throws SQLException {
+ TestUtil.dropTable(conn, "copytesttable");
+ TestUtil.closeDB(conn);
+ }
+
+ private byte[] getData() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ ps.println("35\tSome words go here\t3.14159\tz\\na");
+ ps.println("44\tMore text\t2.71828\th");
+ return baos.toByteArray();
+ }
+
+ public void testCopyIn() throws SQLException {
+ if (((org.postgresql.core.BaseConnection) conn).getPGProtocolVersionMajor() < 3)
+ return;
+ InputStream is = new ByteArrayInputStream(getData());
+ copyManager.copyIn("copytesttable", is);
+
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT a,b,c,d FROM copytesttable");
+ int rowcount = 0;
+ while (rs.next()) {
+ if (rowcount == 0) {
+ assertEquals(rs.getInt(1), 35);
+ assertEquals(rs.getString(2), "Some words go here");
+ assertEquals(rs.getDouble(3), 3.14159, 0.00001);
+ assertEquals(rs.getString(4), "z\na");
+ } else if (rowcount == 1) {
+ assertEquals(rs.getInt(1), 44);
+ assertEquals(rs.getString(2), "More text");
+ assertEquals(rs.getDouble(3), 2.71828, 0.00001);
+ assertEquals(rs.getString(4), "h");
+ } else {
+ fail("Too many rows returned.");
+ }
+ rowcount++;
+ }
+ assertEquals(rowcount, 2);
+
+ rs.close();
+ stmt.close();
+ }
+
+ public void testCopyOut() throws SQLException {
+ if (((org.postgresql.core.BaseConnection) conn).getPGProtocolVersionMajor() < 3)
+ return;
+ PreparedStatement pstmt = conn.prepareStatement("INSERT INTO copytesttable(a,b,c,d) VALUES (?,?,?,?)");
+
+ pstmt.setInt(1, 35);
+ pstmt.setString(2, "Some words go here");
+ pstmt.setDouble(3, 3.14159);
+ pstmt.setString(4, "z\na");
+ pstmt.executeUpdate();
+
+ pstmt.setInt(1, 44);
+ pstmt.setString(2, "More text");
+ pstmt.setDouble(3, 2.71828);
+ pstmt.setString(4, "h");
+ pstmt.executeUpdate();
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ copyManager.copyOut("copytesttable", baos);
+
+ byte orig[] = getData();
+ byte server[] = baos.toByteArray();
+
+ assertEquals(orig.length, server.length);
+ for (int i = 0; i < orig.length; i++) {
+ assertEquals(orig[i], server[i]);
+ }
+
+ pstmt.close();
+ }
+
+ public void testCopyInOut() throws SQLException {
+ if (((org.postgresql.core.BaseConnection) conn).getPGProtocolVersionMajor() < 3)
+ return;
+ byte orig[] = getData();
+ copyManager.copyIn("copytesttable", new ByteArrayInputStream(orig));
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ copyManager.copyOut("copytesttable", baos);
+ byte server[] = baos.toByteArray();
+
+ assertEquals(orig.length, server.length);
+ for (int i = 0; i < orig.length; i++) {
+ assertEquals(orig[i], server[i]);
+ }
+ }
+}