From: | Sergei Sheinin <sergei(dot)sheinin(at)gmail(dot)com> |
---|---|
To: | pgsql-announce(at)postgresql(dot)org |
Subject: | Using Skytools PGQ for targeted copying of data in cluster |
Date: | 2012-11-20 06:00:29 |
Message-ID: | CA+dTRwnbJ5fNdYSpGab=O=9yd6tZeFjPUauM6=DtczOLxqMzXA@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-announce |
In a clustered database where some data on one node is shared with some
other nodes there is a need to provide targeted replication on row level.
It is not possible with Slon replication for replication copies all data.
Using PGQ in a star schema where all nodes are interconnected would require
<number of nodes>^2 processes to run (high number for even a 256-node
cluster).
Solution: Push-Pull queue schema using pgq.RemoteConsumer to send copy
messages of object data.
There are 2 ticker processes (push_queue and pull_queue) and two consumer
processes per node making total number of processes <number of nodes>X2+2.
On push queue side sending node inserts one event per target node:
pgq.insert_event('push_queue','copy', '', <node
number>,'action=ins&id=1&name=test', '' ,'' );
Push queue consumer calls function to relay same event into pull queue
pgq.insert_event('pull_queue','copy', '', <node
number>,'action=ins&id=1&name=test', '' ,'' )
Pull queue consumer before issuing "ev.done" checks that the node it is
running on is the event's intended recipient. It ensures against data loss
should a node become unresponsive as failed event is kept in queue until it
reconnects.
## pull.py
import sys, os, pgq, skytools
import psycopg2
class Copier(pgq.RemoteConsumer):
def __init__(self, args):
pgq.RemoteConsumer.__init__(self, "pull_ticker", "src_db",
"dst_db", args)
### GET "node" SETTING FROM .ini
### MUST ADD "node = <node number>" TO .ini
self.node = self.cf.get("node")
def process_remote_batch(self, db, batch_id, event_list, dst_db):
for ev in event_list:
if ev.type <> 'copy':
ev.tag_done()
### CHECK TARGET IS CURRENT NODE
if self.node == ev.ev_extra1:
### COMMIT EVENT IF IT IS PROCESSING ON TARGET SERVER
ev.tag_done()
cur = dst_db.cursor()
cur.execute("select __data_copier('%s', '%s')" %
(ev.ev_extra1, ev.ev_extra2))
records = cur.fetchall()
dst_db.commit()
if __name__ == '__main__':
script = Copier(sys.argv[1:])
script.start()
From | Date | Subject | |
---|---|---|---|
Next Message | David Fetter | 2012-11-26 06:07:33 | == PostgreSQL Weekly News - November 25 2012 == |
Previous Message | David Fetter | 2012-11-19 07:05:09 | == PostgreSQL Weekly News - November 18 2012 == |