Using Skytools PGQ for targeted copying of data in cluster

From: Sergei Sheinin <sergei(dot)sheinin(at)gmail(dot)com>
To: pgsql-announce(at)postgresql(dot)org
Subject: Using Skytools PGQ for targeted copying of data in cluster
Date: 2012-11-20 06:00:29
Message-ID: CA+dTRwnbJ5fNdYSpGab=O=9yd6tZeFjPUauM6=DtczOLxqMzXA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-announce

In a clustered database where some data on one node is shared with some
other nodes there is a need to provide targeted replication on row level.
It is not possible with Slon replication for replication copies all data.
Using PGQ in a star schema where all nodes are interconnected would require
<number of nodes>^2 processes to run (high number for even a 256-node
cluster).

Solution: Push-Pull queue schema using pgq.RemoteConsumer to send copy
messages of object data.

There are 2 ticker processes (push_queue and pull_queue) and two consumer
processes per node making total number of processes <number of nodes>X2+2.

On push queue side sending node inserts one event per target node:
pgq.insert_event('push_queue','copy', '', <node
number>,'action=ins&id=1&name=test', '' ,'' );

Push queue consumer calls function to relay same event into pull queue
pgq.insert_event('pull_queue','copy', '', <node
number>,'action=ins&id=1&name=test', '' ,'' )

Pull queue consumer before issuing "ev.done" checks that the node it is
running on is the event's intended recipient. It ensures against data loss
should a node become unresponsive as failed event is kept in queue until it
reconnects.

## pull.py
import sys, os, pgq, skytools
import psycopg2

class Copier(pgq.RemoteConsumer):
def __init__(self, args):
pgq.RemoteConsumer.__init__(self, "pull_ticker", "src_db",
"dst_db", args)
### GET "node" SETTING FROM .ini
### MUST ADD "node = <node number>" TO .ini
self.node = self.cf.get("node")

def process_remote_batch(self, db, batch_id, event_list, dst_db):
for ev in event_list:
if ev.type <> 'copy':
ev.tag_done()
### CHECK TARGET IS CURRENT NODE
if self.node == ev.ev_extra1:
### COMMIT EVENT IF IT IS PROCESSING ON TARGET SERVER
ev.tag_done()
cur = dst_db.cursor()
cur.execute("select __data_copier('%s', '%s')" %
(ev.ev_extra1, ev.ev_extra2))
records = cur.fetchall()
dst_db.commit()

if __name__ == '__main__':
script = Copier(sys.argv[1:])
script.start()

Browse pgsql-announce by date

  From Date Subject
Next Message David Fetter 2012-11-26 06:07:33 == PostgreSQL Weekly News - November 25 2012 ==
Previous Message David Fetter 2012-11-19 07:05:09 == PostgreSQL Weekly News - November 18 2012 ==