Can psycopg2 copy_expert read from an io.StringIO() buffer?

From: Jeff Ross <jross(at)openvistas(dot)net>
To: psycopg(at)lists(dot)postgresql(dot)org
Subject: Can psycopg2 copy_expert read from an io.StringIO() buffer?
Date: 2022-11-03 20:15:11
Message-ID: 6dfed0bf-a52a-66c0-43a9-6258d7b9843b@openvistas.net
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: psycopg

Hi all,

Bit of explanation first...

I'm working on a script to "heal" a logically replicated database after
a replication outage. We get these outages periodically, especially to
the several we have hosted in RDS. Replication either stops outright or
slows so much it might as well be stopped. When that happens, the only
fix I've found is to drop the subscription and then start it back up
with (copy_data = False).

Once started again replication proceeds from that point on at full
speed. However, we now have a nice sized whole in the data that needs
to be backfilled.

I've been doing this by joining the publisher and subscriber with
postgres_fdw and comparing tables. Left joins between publisher and
subscriber will show me the rows on the publisher that are not on the
subscriber, and then flip the tables to find rows on the subscriber that
are not on the publisher and need to be deleted. To find rows that need
updated are found with select * from publisher.table except select *
from subscriber.table.

I had been doing this in a plpy function both reading and writing to the
fdw tables. That works but is excruciatingly slow--one table we have
has 170,000,000 rows and takes about 3 hours to sync. A 325G database
takes about 6.5 hours total.

Enter psycopg2. My plan now is to only query the fdw tables to identify
the rows, then use psycopg2 connections to both databases to directly
insert/delete/update (by deleting from and then inserting to) the
subscriber table.

Working on inserts now, with this code (queries freshly ported over from
the plpy function):

subscriber_connection.set_session(autocommit=True)
csv_buf = io.StringIO()
size = size = 3**20 #3GB

copy_query = """
copy (
select a.* from %s.%s a
join %s.pkeys b on a.%s = b.%s
order by a.%s
) to stdout with csv
""" % (schema,table_name,schema,pkey,pkey_column,pkey_column)

insert_query = """
copy %s.%s from stdin with csv;
""" % (schema,table_name)

try:
publisher_copy_cursor.copy_expert(copy_query,csv_buf,size)
subscriber_copy_cursor.copy_expert(insert_query,csv_buf)
subscriber_copy_cursor.close()
except Exception as e:
debug_output("insert error! %s" % (e))

So, that most of that works pretty well. csv_buf is filled with pkeys
of the rows missing from the subscriber. I've verified that works from
the python shell.

On the subscriber I see the copy command hit the logs:

2022-11-03 14:27:15.357
EDT,"postgres","dr_metroarchive",43810,"172.26.27.10:39614",636407fc.ab22,30,"COPY",2022-11-03
14:27:08 EDT,4/0,0,LOG,00000,"duration: 53.782 ms statement: copy
metro.client_profile from stdin with csv;",,,,,,,,,""

But no rows ever actually get inserted.

The only thing I can think of so far is that copy_expert isn't reading
csv_buf. And now that I write that I wonder if it's because csv_buf is
in memory on the publisher and not on the subscriber.

If that's the case maybe this all boils down to "how do I pipe stdout
from the publisher to the subscriber across the subscriber cursor?"

That's a lot of email for one short question--apologies and thanks in
advance for any clue by fours!

Jeff Ross

Responses

Browse psycopg by date

  From Date Subject
Next Message Daniele Varrazzo 2022-11-03 20:42:01 Re: Can psycopg2 copy_expert read from an io.StringIO() buffer?
Previous Message Daniele Varrazzo 2022-11-01 18:08:46 Re: Error while trying to install in 3.11