From fc921af92f17b6b7b6c2822ba0503b4bdc41d757 Mon Sep 17 00:00:00 2001 From: Petr Jelinek Date: Thu, 15 Dec 2016 15:20:08 +0100 Subject: [PATCH] Logical replication support for initial data copy --- contrib/file_fdw/file_fdw.c | 5 +- doc/src/sgml/catalogs.sgml | 79 +++ doc/src/sgml/config.sgml | 25 + doc/src/sgml/logical-replication.sgml | 55 +- doc/src/sgml/monitoring.sgml | 9 +- doc/src/sgml/ref/alter_subscription.sgml | 50 +- doc/src/sgml/ref/create_subscription.sgml | 39 ++ src/backend/catalog/Makefile | 1 + src/backend/catalog/heap.c | 6 + src/backend/catalog/pg_subscription.c | 291 +++++++++ src/backend/catalog/system_views.sql | 1 + src/backend/commands/copy.c | 23 +- src/backend/commands/subscriptioncmds.c | 391 +++++++++-- src/backend/parser/gram.y | 26 +- src/backend/postmaster/pgstat.c | 6 + .../libpqwalreceiver/libpqwalreceiver.c | 229 ++++++- src/backend/replication/logical/Makefile | 2 +- src/backend/replication/logical/launcher.c | 99 ++- src/backend/replication/logical/logical.c | 12 +- src/backend/replication/logical/relation.c | 7 + src/backend/replication/logical/snapbuild.c | 85 ++- src/backend/replication/logical/tablesync.c | 727 +++++++++++++++++++++ src/backend/replication/logical/worker.c | 154 ++++- src/backend/replication/pgoutput/pgoutput.c | 1 + src/backend/replication/repl_gram.y | 25 +- src/backend/replication/repl_scanner.l | 4 +- src/backend/replication/walsender.c | 67 +- src/backend/tcop/postgres.c | 5 +- src/backend/utils/adt/misc.c | 20 + src/backend/utils/cache/syscache.c | 23 + src/backend/utils/misc/guc.c | 12 + src/include/catalog/indexing.h | 6 + src/include/catalog/pg_proc.h | 5 +- src/include/catalog/pg_subscription_rel.h | 77 +++ src/include/commands/copy.h | 5 +- src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 12 + src/include/nodes/replnodes.h | 5 + src/include/pgstat.h | 4 +- src/include/replication/logical.h | 15 +- src/include/replication/logicallauncher.h | 1 + src/include/replication/snapbuild.h | 1 + src/include/replication/walreceiver.h | 19 + src/include/replication/walsender.h | 2 +- src/include/replication/worker_internal.h | 20 +- src/include/utils/builtins.h | 1 + src/include/utils/syscache.h | 2 + src/test/README | 3 + src/test/regress/expected/rules.out | 3 +- src/test/regress/expected/sanity_check.out | 1 + src/test/regress/expected/subscription.out | 46 +- src/test/regress/sql/subscription.sql | 9 +- src/test/subscription/t/001_rep_changes.pl | 36 +- src/test/subscription/t/002_types.pl | 6 + src/test/subscription/t/003_sync.pl | 159 +++++ 55 files changed, 2645 insertions(+), 273 deletions(-) create mode 100644 src/backend/replication/logical/tablesync.c create mode 100644 src/include/catalog/pg_subscription_rel.h create mode 100644 src/test/subscription/t/003_sync.pl diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c index 735b794..277639f 100644 --- a/contrib/file_fdw/file_fdw.c +++ b/contrib/file_fdw/file_fdw.c @@ -662,6 +662,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags) node->ss.ss_currentRelation, filename, is_program, + NULL, NIL, options); @@ -737,6 +738,7 @@ fileReScanForeignScan(ForeignScanState *node) node->ss.ss_currentRelation, festate->filename, festate->is_program, + NULL, NIL, festate->options); } @@ -1100,7 +1102,8 @@ file_acquire_sample_rows(Relation onerel, int elevel, /* * Create CopyState from FDW options. */ - cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NIL, options); + cstate = BeginCopyFrom(NULL, onerel, filename, is_program, NULL, NIL, + options); /* * Use per-tuple memory context to prevent leak of memory used to read diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 85bea7e..29bcea4 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -301,6 +301,11 @@ + pg_subscription_rel + relation state mapping for subscriptions + + + pg_tablespace tablespaces within this database cluster @@ -6400,6 +6405,80 @@ + + <structname>pg_subscription_rel</structname> + + + pg_subscription_rel + + + + The catalog pg_subscription_rel contains the + status for each replicated relation in each subscription. This is a + many-to-many mapping. + + + + This catalog only contains tables known to subscription after running + either CREATE SUBSCRIPTION or + ALTER SUBSCRIPTION ... REFRESH commands. + + + + <structname>pg_subscription_rel</structname> Columns + + + + + Name + Type + References + Description + + + + + + srsubid + oid + pg_subscription.oid + Reference to subscription + + + + srrelid + oid + pg_class.oid + Reference to relation + + + + srsubstate + char + + + i = initialize, + d = data is being copied, + w = waiting for synchronization with apply worker, + c = catching up, + s = synchronized, + r = ready (normal replication) + + + + + srsublsn + pg_lsn + + + End LSN for w, c and s states. + + + + +
+
+ <structname>pg_tablespace</structname> diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index fb5d647..17d9645 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -3449,6 +3449,31 @@ ANY num_sync ( + max_sync_workers_per_subscription (integer) + + max_sync_workers_per_subscription configuration parameter + + + + + Maximum number of synchronization workers per subscription. This + parameter control the amount of paralelism of the initial data copy + during the subscription initialization or when new tables are added. + + + Currently, there can be only one synchronization worker per table. + + + The synchronization workers are taken from the pool defined by + max_logical_replication_workers. + + + The default value is 2. + + + + diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 32c132f..665fed4 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -22,11 +22,13 @@ cascading replication or more complex configurations. - Logical replication sends the changes on the publisher to the subscriber - as they occur in real-time. The subscriber applies the data in the same - order as the publisher so that transactional consistency is guaranteed - for publications within a single subscription. This method of data - replication is sometimes referred to as transactional replication. + Logical replication typically starts with a snapshot of the data on + the publisher database. Once that is done, the changes on the publisher + are sent to the subscriber as they occur in real-time. The subscriber + applies the data in the same order as the publisher so that + transactional consistency is guaranteed for publications within a + single subscription. This method of data replication is sometimes + referred to as transactional replication. The typical use-cases for logical replication are: @@ -142,7 +144,9 @@ Each subscription will receive changes via one replication slot (see - ). + ). Additional temporary + replication slots may be required for the initial data synchronizations + of pre-existing table data. Subscriptions are not dumped by pg_dump by default but can be @@ -234,6 +238,21 @@ session_replication_role set to replica, which produces the usual effects on triggers and constraints. + + Initial Snapshot + + The initial data in existing subscribed tables are snapshotted and + copied in a parallel instance of a special kind of apply process. + This process will create its own temporary replication slot and + copy the existing data. Once existing data is copied, the worker + enters synchronization mode, which ensures that the table is brought + up to a synchronized state with the main apply process by streaming + any changes that happened during the initial data copy using standard + logical replication. Once the synchronization is done, the control + of the replication of the table is given back to the main apply + process where the replication continues as normal. + + Monitoring @@ -253,7 +272,9 @@ Normally, there is a single apply process running for an enabled subscription. A disabled subscription or a crashed subscription will - have zero rows in this view. + have zero rows in this view. If the initial data synchronization of + any table is in progress there will be additional workers for the + tables being synchronized. @@ -292,19 +313,19 @@ On the publisher side, wal_level must be set to logical, and max_replication_slots has to be set to - at least the number of subscriptions expected to connect. And - max_wal_senders should be set to at least the same as - max_replication_slots plus the number of physical replicas - that are connected at the same time. + at least the number of subscriptions expected to connect with some reserve + for table synchronization. And max_wal_senders + should be set to at least the same as max_replication_slots plus + the number of physical replicas that are connected at the same time. The subscriber also requires the max_replication_slots to be set. In this case it should be set to at least the number of subscriptions that will be added to the subscriber. max_logical_replication_workers has to be set to at least - the number of subscriptions. Additionally the - max_worker_processes may need to be adjusted to accommodate - for replication workers, at least + the number of subscriptions, again with some reserve for the table + synchronization. Additionally the max_worker_processes may + need to be adjusted to accommodate for replication workers, at least (max_logical_replication_workers + 1). Note that some extensions and parallel queries also take worker slots from max_worker_processes. @@ -343,8 +364,10 @@ CREATE SUBSCRIPTION mysub CONNECTION 'dbname=foo host=bar user=repuser' PUBLICAT - The above will start the replication process of changes to - users and departments tables. + The above will start the replication process, which synchronizes the + initial table contents of users and + departments tables and then starts replicating + incremental changes to those tables. diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 01fad38..91be50c 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1581,6 +1581,12 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i Process ID of the subscription worker process + relid + Oid + Relation id which the worker is synchronizing, this is always + NULL for the main apply worker + + received_lsn pg_lsn Last transaction log position received, the initial value of @@ -1616,7 +1622,8 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i The pg_stat_subscription view will contain one row per subscription for main worker (with null PID if the worker is - not running). + not running), and additional rows for workers handling the initial data + copy of the subscribed tables. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 032ecbb..9b961bc 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -21,15 +21,21 @@ PostgreSQL documentation -ALTER SUBSCRIPTION name WITH ( option [, ... ] ) ] +ALTER SUBSCRIPTION name WITH ( suboption [, ... ] ) ] -where option can be: +where suboption can be: - SLOT NAME = slot_name + SLOT NAME = slot_name + +ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] WITH ( puboption [, ... ] ) +ALTER SUBSCRIPTION name REFRESH PUBLICATION WITH ( puboption [, ... ] ) + +where puboption can be: + + COPY DATA | NOCOPY DATA ALTER SUBSCRIPTION name OWNER TO { new_owner | CURRENT_USER | SESSION_USER } ALTER SUBSCRIPTION name CONNECTION 'conninfo' -ALTER SUBSCRIPTION name SET PUBLICATION publication_name [, ...] ALTER SUBSCRIPTION name ENABLE ALTER SUBSCRIPTION name DISABLE @@ -65,7 +71,6 @@ ALTER SUBSCRIPTION name DISABLE CONNECTION 'conninfo' - SET PUBLICATION publication_name SLOT NAME = slot_name @@ -77,6 +82,40 @@ ALTER SUBSCRIPTION name DISABLE + SET PUBLICATION publication_name + + + Changes list of subscribed publications. See + for more information. + + + This clause will also execute REFRESH PUBLICATION. + + + + + + REFRESH PUBLICATION + + + Fetch missing table info from publisher. This will start replication + of tables that were added to subscribed publications since last + invocation of REFRESH PUBLICATION or since the + CREATE SUBSCRIPTION. Any existing data in those + tables will be copied as well in same manner in which + CREATE SUBSCRIPTION with + COPY DATA copies them. + + + The COPY DATA and NOCOPY DATA + options specify if the existing data in the publication that are being + subscribed should be copied. COPY DATA is the + default. + + + + + ENABLE @@ -95,6 +134,7 @@ ALTER SUBSCRIPTION name DISABLE + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 40d08b3..4358730 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -28,6 +28,8 @@ CREATE SUBSCRIPTION subscription_name @@ -123,6 +125,43 @@ CREATE SUBSCRIPTION subscription_name + + + COPY DATA + NOCOPY DATA + + + Specifies if the existing data in the publication that are being + subscribed should be copied once the replication starts. + COPY DATA is the default. + + + + + + SKIP CONNECT + + + Instructs the CREATE SUBSCRIPTION to skip initial + connection to the provider. This will change defaults to + DISABLED, NOCREATE SLOT and + COPY DATA. + + + It's not allowed to combine SKIP CONNECT and + ENABLED, CREATE SLOT or + COPY DATA. + + + Since no connection is made when this option is specified the tables + are not subscribed, so after you enable the subscription nothing will + be replicated. It is required to run + ALTER SUBSCRIPTION ... REFRESH PUBLICATION in order for + tables to be subscribed. + + + + diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 3136858..159cab5 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -44,6 +44,7 @@ POSTGRES_BKI_SRCS = $(addprefix $(top_srcdir)/src/include/catalog/,\ pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \ + pg_subscription_rel.h toasting.h indexing.h \ toasting.h indexing.h \ ) diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c index 72aa0dd..71ecf01 100644 --- a/src/backend/catalog/heap.c +++ b/src/backend/catalog/heap.c @@ -52,6 +52,7 @@ #include "catalog/pg_opclass.h" #include "catalog/pg_partitioned_table.h" #include "catalog/pg_statistic.h" +#include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_type.h" #include "catalog/pg_type_fn.h" @@ -1852,6 +1853,11 @@ heap_drop_with_catalog(Oid relid) relation_close(rel, NoLock); /* + * Remove any associated relation synchronization states. + */ + RemoveSubscriptionRel(InvalidOid, relid); + + /* * Forget any ON COMMIT action for the rel */ remove_on_commit_action(relid); diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index eae5063..44bb9b2 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -18,15 +18,20 @@ #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" +#include "access/xact.h" +#include "catalog/indexing.h" #include "catalog/pg_type.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_rel.h" #include "nodes/makefuncs.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/fmgroids.h" +#include "utils/pg_lsn.h" +#include "utils/rel.h" #include "utils/syscache.h" @@ -205,3 +210,289 @@ textarray_to_stringlist(ArrayType *textarray) return res; } + +/* + * Set the state of a subscription table. + */ +Oid +SetSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) +{ + Relation rel; + HeapTuple tup; + Oid subrelid; + bool nulls[Natts_pg_subscription_rel]; + Datum values[Natts_pg_subscription_rel]; + + /* Prevent concurrent changes. */ + rel = heap_open(SubscriptionRelRelationId, ShareRowExclusiveLock); + + /* Try finding existing mapping. */ + tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(subid)); + + /* + * If the record for given table does not exist yet create new + * record, otherwise update the existing one. + */ + if (!HeapTupleIsValid(tup)) + { + /* Form the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); + values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + if (sublsn != InvalidXLogRecPtr) + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + else + nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + /* Insert tuple into catalog. */ + subrelid = simple_heap_insert(rel, tup); + CatalogUpdateIndexes(rel, tup); + + heap_freetuple(tup); + } + else + { + bool replaces[Natts_pg_subscription_rel]; + + /* Update the tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, true, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + replaces[Anum_pg_subscription_rel_srsubstate - 1] = true; + nulls[Anum_pg_subscription_rel_srsubstate - 1] = false; + values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + + replaces[Anum_pg_subscription_rel_srsublsn - 1] = true; + if (sublsn != InvalidXLogRecPtr) + { + nulls[Anum_pg_subscription_rel_srsublsn - 1] = false; + values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); + } + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + simple_heap_update(rel, &tup->t_self, tup); + CatalogUpdateIndexes(rel, tup); + + subrelid = HeapTupleGetOid(tup); + } + + /* Cleanup. */ + heap_close(rel, NoLock); + + /* Make the changes visible. */ + CommandCounterIncrement(); + + return subrelid; +} + +/* + * Get state of subscription table. + * + * Returns SUBREL_STATE_UNKNOWN when not found and missing_ok is true. + */ +char +GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn, + bool missing_ok) +{ + Relation rel; + HeapTuple tup; + char substate; + bool isnull; + Datum d; + + rel = heap_open(SubscriptionRelRelationId, AccessShareLock); + + /* Try finding the mapping. */ + tup = SearchSysCache2(SUBSCRIPTIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(subid)); + + if (!HeapTupleIsValid(tup)) + { + if (missing_ok) + { + heap_close(rel, RowExclusiveLock); + *sublsn = InvalidXLogRecPtr; + return '\0'; + } + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("subscription table %u in subscription %u does not exist", + relid, subid))); + } + + /* Get the state. */ + d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, + Anum_pg_subscription_rel_srsubstate, &isnull); + Assert(!isnull); + substate = DatumGetChar(d); + d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, + Anum_pg_subscription_rel_srsublsn, &isnull); + if (isnull) + *sublsn = InvalidXLogRecPtr; + else + *sublsn = DatumGetLSN(d); + + /* Cleanup */ + ReleaseSysCache(tup); + heap_close(rel, AccessShareLock); + + return substate; +} + +/* + * Drop subscription relation mapping. These can be for a particular + * subscription, or for a particular relation, or both. + */ +void +RemoveSubscriptionRel(Oid subid, Oid relid) +{ + Relation rel; + HeapScanDesc scan; + ScanKeyData skey[2]; + HeapTuple tup; + int nkeys = 0; + + /* Prevent concurrent changes (see SetSubscriptionRelState()). */ + rel = heap_open(SubscriptionRelRelationId, ShareRowExclusiveLock); + + if (OidIsValid(subid)) + { + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, + F_OIDEQ, + ObjectIdGetDatum(subid)); + } + + if (OidIsValid(relid)) + { + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srrelid, + BTEqualStrategyNumber, + F_OIDEQ, + ObjectIdGetDatum(relid)); + } + + /* Do the search and delete what we found. */ + scan = heap_beginscan_catalog(rel, nkeys, skey); + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + simple_heap_delete(rel, &tup->t_self); + } + heap_endscan(scan); + + heap_close(rel, ShareRowExclusiveLock); +} + + +/* + * Get all relations for subscription. + * + * Returned list is palloced in current memory context. + */ +List * +GetSubscriptionRelations(Oid subid) +{ + List *res = NIL; + Relation rel; + HeapTuple tup; + int nkeys = 0; + ScanKeyData skey[2]; + SysScanDesc scan; + + rel = heap_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, nkeys, skey); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + SubscriptionRelState *relstate; + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + relstate = (SubscriptionRelState *)palloc(sizeof(SubscriptionRelState)); + relstate->relid = subrel->srrelid; + relstate->state = subrel->srsubstate; + relstate->lsn = subrel->srsublsn; + + res = lappend(res, relstate); + } + + /* Cleanup */ + systable_endscan(scan); + heap_close(rel, AccessShareLock); + + return res; +} + +/* + * Get all relations for subscription that are not in a ready state. + * + * Returned list is palloced in current memory context. + */ +List * +GetSubscriptionNotReadyRelations(Oid subid) +{ + List *res = NIL; + Relation rel; + HeapTuple tup; + int nkeys = 0; + ScanKeyData skey[2]; + SysScanDesc scan; + + rel = heap_open(SubscriptionRelRelationId, AccessShareLock); + + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(subid)); + + ScanKeyInit(&skey[nkeys++], + Anum_pg_subscription_rel_srsubstate, + BTEqualStrategyNumber, F_CHARNE, + CharGetDatum(SUBREL_STATE_READY)); + + scan = systable_beginscan(rel, InvalidOid, false, + NULL, nkeys, skey); + + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_subscription_rel subrel; + SubscriptionRelState *relstate; + + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); + + relstate = (SubscriptionRelState *)palloc(sizeof(SubscriptionRelState)); + relstate->relid = subrel->srrelid; + relstate->state = subrel->srsubstate; + relstate->lsn = subrel->srsublsn; + + res = lappend(res, relstate); + } + + /* Cleanup */ + systable_endscan(scan); + heap_close(rel, AccessShareLock); + + return res; +} diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b09ccc3..a335747 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -722,6 +722,7 @@ CREATE VIEW pg_stat_subscription AS su.oid AS subid, su.subname, st.pid, + st.relid, st.received_lsn, st.last_msg_send_time, st.last_msg_receipt_time, diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 1fd2162..54ca3a3 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -60,7 +60,8 @@ typedef enum CopyDest { COPY_FILE, /* to/from file (or a piped program) */ COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ - COPY_NEW_FE /* to/from frontend (3.0 protocol) */ + COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ + COPY_CALLBACK /* to/from callback function */ } CopyDest; /* @@ -109,6 +110,7 @@ typedef struct CopyStateData List *attnumlist; /* integer list of attnums to copy */ char *filename; /* filename, or NULL for STDIN/STDOUT */ bool is_program; /* is 'filename' a program to popen? */ + copy_data_source_cb data_source_cb; /* function for reading data*/ bool binary; /* binary format? */ bool oids; /* include OIDs? */ bool freeze; /* freeze rows on loading? */ @@ -300,7 +302,6 @@ static uint64 DoCopyTo(CopyState cstate); static uint64 CopyTo(CopyState cstate); static void CopyOneRowTo(CopyState cstate, Oid tupleOid, Datum *values, bool *nulls); -static uint64 CopyFrom(CopyState cstate); static void CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid, int hi_options, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, @@ -530,6 +531,9 @@ CopySendEndOfRow(CopyState cstate) /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len); break; + case COPY_CALLBACK: + Assert(false); /* Not yet supported. */ + break; } resetStringInfo(fe_msgbuf); @@ -644,6 +648,9 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) bytesread += avail; } break; + case COPY_CALLBACK: + bytesread = cstate->data_source_cb(databuf, minread, maxread); + break; } return bytesread; @@ -969,7 +976,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, PreventCommandIfParallelMode("COPY FROM"); cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, - stmt->attlist, stmt->options); + NULL, stmt->attlist, stmt->options); cstate->range_table = range_table; *processed = CopyFrom(cstate); /* copy from file to database */ EndCopyFrom(cstate); @@ -2286,7 +2293,7 @@ limit_printout_length(const char *str) /* * Copy FROM file to relation. */ -static uint64 +uint64 CopyFrom(CopyState cstate) { HeapTuple tuple; @@ -2867,6 +2874,7 @@ BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, bool is_program, + copy_data_source_cb data_source_cb, List *attnamelist, List *options) { @@ -2981,7 +2989,12 @@ BeginCopyFrom(ParseState *pstate, cstate->num_defaults = num_defaults; cstate->is_program = is_program; - if (pipe) + if (data_source_cb) + { + cstate->copy_dest = COPY_CALLBACK; + cstate->data_source_cb = data_source_cb; + } + else if (pipe) { Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput == DestRemote) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 56b254e..1bdcd5a 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -18,11 +18,14 @@ #include "access/heapam.h" #include "access/htup_details.h" +#include "catalog/dependency.h" #include "catalog/indexing.h" +#include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_type.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_rel.h" #include "commands/defrem.h" #include "commands/event_trigger.h" @@ -36,9 +39,12 @@ #include "storage/lmgr.h" #include "utils/builtins.h" +#include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" +static int oid_cmp(const void *p1, const void *p2); + /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. * @@ -47,17 +53,17 @@ * accomodate that. */ static void -parse_subscription_options(List *options, char **conninfo, - List **publications, bool *enabled_given, - bool *enabled, bool *create_slot, char **slot_name) +parse_subscription_options(List *options, bool *connect, bool *enabled_given, + bool *enabled, bool *create_slot, char **slot_name, + bool *copy_data) { ListCell *lc; + bool connect_given = false; bool create_slot_given = false; + bool copy_data_given = false; - if (conninfo) - *conninfo = NULL; - if (publications) - *publications = NIL; + if (connect) + *connect = true; if (enabled) { *enabled_given = false; @@ -67,29 +73,23 @@ parse_subscription_options(List *options, char **conninfo, *create_slot = true; if (slot_name) *slot_name = NULL; + if (copy_data) + *copy_data = true; /* Parse options */ foreach (lc, options) { DefElem *defel = (DefElem *) lfirst(lc); - if (strcmp(defel->defname, "conninfo") == 0 && conninfo) + if (strcmp(defel->defname, "skip connect") == 0 && connect) { - if (*conninfo) + if (connect_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *conninfo = defGetString(defel); - } - else if (strcmp(defel->defname, "publication") == 0 && publications) - { - if (*publications) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); - - *publications = defGetStringList(defel); + connect_given = true; + *connect = !defGetBoolean(defel); } else if (strcmp(defel->defname, "enabled") == 0 && enabled) { @@ -140,9 +140,57 @@ parse_subscription_options(List *options, char **conninfo, *slot_name = defGetString(defel); } + else if (strcmp(defel->defname, "copy data") == 0 && copy_data) + { + if (copy_data_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + copy_data_given = true; + *copy_data = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "nocopy data") == 0 && copy_data) + { + if (copy_data_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + + copy_data_given = true; + *copy_data = !defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } + + /* + * We've been explicitly asked to not connect, that requires some + * additional processing. + */ + if (connect && !*connect) + { + /* Check for incompatible options from the user. */ + if (*enabled_given && *enabled) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("noconnect and enabled are mutually exclusive options"))); + + if (create_slot_given && *create_slot) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("noconnect and create slot are mutually exclusive options"))); + + if (copy_data_given && *copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("noconnect and copy data are mutually exclusive options"))); + + /* Change the defaults of other options. */ + *enabled = false; + *create_slot = false; + *copy_data = false; + } } /* @@ -211,8 +259,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt) bool nulls[Natts_pg_subscription]; Datum values[Natts_pg_subscription]; HeapTuple tup; + bool connect; bool enabled_given; bool enabled; + bool copy_data; char *conninfo; char *slotname; char originname[NAMEDATALEN]; @@ -241,9 +291,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt) * Parse and check options. * Connection and publication should not be specified here. */ - parse_subscription_options(stmt->options, NULL, NULL, - &enabled_given, &enabled, - &create_slot, &slotname); + parse_subscription_options(stmt->options, &connect, &enabled_given, + &enabled, &create_slot, &slotname, ©_data); if (slotname == NULL) slotname = stmt->subname; @@ -283,14 +332,17 @@ CreateSubscription(CreateSubscriptionStmt *stmt) replorigin_create(originname); /* - * If requested, create the replication slot on remote side for our - * newly created subscription. + * Connect to remote side to execute requested commands and fetch table + * info. */ - if (create_slot) + if (connect) { XLogRecPtr lsn; char *err; WalReceiverConn *wrconn; + List *tables; + ListCell *lc; + char table_state; /* Try to connect to the publisher. */ wrconn = walrcv_connect(conninfo, true, stmt->subname, &err); @@ -298,14 +350,47 @@ CreateSubscription(CreateSubscriptionStmt *stmt) ereport(ERROR, (errmsg("could not connect to the publisher: %s", err))); - walrcv_create_slot(wrconn, slotname, false, &lsn); + /* + * If requested, create the replication slot on remote side for our + * newly created subscription. + */ + if (create_slot) + { + walrcv_create_slot(wrconn, slotname, false, &lsn); + ereport(NOTICE, + (errmsg("created replication slot \"%s\" on publisher", + slotname))); + } + + /* Set sync state based on if we were asked to do data copy or not. */ + table_state = copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY; + /* + * Get the table list from publisher and build local table status + * info. + */ + tables = walrcv_table_list(wrconn, publications); + foreach (lc, tables) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, true); + + SetSubscriptionRelState(subid, relid, table_state, + InvalidXLogRecPtr); + } + ereport(NOTICE, - (errmsg("created replication slot \"%s\" on publisher", - slotname))); + (errmsg("synchronized table states"))); /* And we are done with the remote side. */ walrcv_disconnect(wrconn); } + else + ereport(WARNING, + (errmsg("tables were not subscribed, you will have to run " + "ALTER SUBSCRIPTION ... REFRESH PUBLICATION to " + "subscribe the tables"))); heap_close(rel, RowExclusiveLock); @@ -318,6 +403,108 @@ CreateSubscription(CreateSubscriptionStmt *stmt) return myself; } +static void +AlterSubscription_refresh(Subscription *sub, bool copy_data) +{ + char *err; + List *pubrel_names; + List *subrel_states; + Oid *subrel_local_oids; + Oid *pubrel_local_oids; + ListCell *lc; + int off; + + /* Load the library providing us libpq calls. */ + load_file("libpqwalreceiver", false); + + /* Try to connect to the publisher. */ + wrconn = walrcv_connect(sub->conninfo, true, sub->name, &err); + if (!wrconn) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* Get the table list from publisher. */ + pubrel_names = walrcv_table_list(wrconn, sub->publications); + + /* We are done with the remote side, close connection. */ + walrcv_disconnect(wrconn); + + /* Get local table list. */ + subrel_states = GetSubscriptionRelations(sub->oid); + + /* + * Build qsorted array of local table oids for faster lookup. + * This can potentially contain all tables in the database so + * speed of lookup is important. + */ + subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + off = 0; + foreach(lc, subrel_states) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + subrel_local_oids[off++] = relstate->relid; + } + qsort(subrel_local_oids, list_length(subrel_states), + sizeof(Oid), oid_cmp); + + /* + * Walk over the remote tables and try to match them to locally + * known tables. If the table is not known locally create a new state + * for it. + * + * Also builds array of local oids of remote tables for the next step. + */ + off = 0; + pubrel_local_oids = palloc(list_length(pubrel_names) * sizeof(Oid)); + + foreach (lc, pubrel_names) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + pubrel_local_oids[off++] = relid; + + if (!bsearch(&relid, subrel_local_oids, + list_length(subrel_states), sizeof(Oid), oid_cmp)) + { + SetSubscriptionRelState(sub->oid, relid, + copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, + InvalidXLogRecPtr); + ereport(NOTICE, + (errmsg("added subscription for table %s.%s", + quote_identifier(rv->schemaname), + quote_identifier(rv->relname)))); + } + } + + /* + * Next remove state for tables we should not care about anymore using + * the data we collected above + */ + qsort(pubrel_local_oids, list_length(pubrel_names), + sizeof(Oid), oid_cmp); + + for (off = 0; off < list_length(subrel_states); off++) + { + Oid relid = subrel_local_oids[off]; + + if (!bsearch(&relid, pubrel_local_oids, + list_length(pubrel_names), sizeof(Oid), oid_cmp)) + { + char *namespace; + + RemoveSubscriptionRel(sub->oid, relid); + + namespace = get_namespace_name(get_rel_namespace(relid)); + ereport(NOTICE, + (errmsg("removed subscription for table %s.%s", + quote_identifier(namespace), + quote_identifier(get_rel_name(relid))))); + } + } +} + /* * Alter the existing subscription. */ @@ -331,11 +518,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt) Datum values[Natts_pg_subscription]; HeapTuple tup; Oid subid; - bool enabled_given; - bool enabled; - char *conninfo; - char *slot_name; - List *publications; + bool update_tuple = false; rel = heap_open(SubscriptionRelationId, RowExclusiveLock); @@ -356,53 +539,108 @@ AlterSubscription(AlterSubscriptionStmt *stmt) subid = HeapTupleGetOid(tup); - /* Parse options. */ - parse_subscription_options(stmt->options, &conninfo, &publications, - &enabled_given, &enabled, - NULL, &slot_name); - /* Form a new tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); - if (enabled_given) - { - values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); - replaces[Anum_pg_subscription_subenabled - 1] = true; - } - if (conninfo) + switch (stmt->kind) { - values[Anum_pg_subscription_subconninfo - 1] = - CStringGetTextDatum(conninfo); - replaces[Anum_pg_subscription_subconninfo - 1] = true; - } - if (slot_name) - { - values[Anum_pg_subscription_subslotname - 1] = - DirectFunctionCall1(namein, CStringGetDatum(slot_name)); - replaces[Anum_pg_subscription_subslotname - 1] = true; - } - if (publications != NIL) - { - values[Anum_pg_subscription_subpublications - 1] = - publicationListToArray(publications); - replaces[Anum_pg_subscription_subpublications - 1] = true; + case ALTER_SUBSCRIPTION_OPTIONS: + { + char *slot_name; + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, &slot_name, NULL); + + values[Anum_pg_subscription_subslotname - 1] = + DirectFunctionCall1(namein, CStringGetDatum(slot_name)); + replaces[Anum_pg_subscription_subslotname - 1] = true; + + update_tuple = true; + break; + } + + case ALTER_SUBSCRIPTION_ENABLED: + { + bool enabled, + enabled_given; + + parse_subscription_options(stmt->options, NULL, + &enabled_given, &enabled, NULL, + NULL, NULL); + Assert(enabled_given); + + values[Anum_pg_subscription_subenabled - 1] = + BoolGetDatum(enabled); + replaces[Anum_pg_subscription_subenabled - 1] = true; + + update_tuple = true; + break; + } + + case ALTER_SUBSCRIPTION_CONNECTION: + values[Anum_pg_subscription_subconninfo - 1] = + CStringGetTextDatum(stmt->conninfo); + replaces[Anum_pg_subscription_subconninfo - 1] = true; + update_tuple = true; + break; + + case ALTER_SUBSCRIPTION_PUBLICATION: + { + bool copy_data; + Subscription *sub = GetSubscription(subid, false); + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, ©_data); + + values[Anum_pg_subscription_subpublications - 1] = + publicationListToArray(stmt->publication); + replaces[Anum_pg_subscription_subpublications - 1] = true; + + update_tuple = true; + + /* Make sure refresh sees the new list of publications. */ + sub->publications = stmt->publication; + AlterSubscription_refresh(sub, copy_data); + + break; + } + + case ALTER_SUBSCRIPTION_REFRESH: + { + bool copy_data; + Subscription *sub = GetSubscription(subid, false); + + parse_subscription_options(stmt->options, NULL, NULL, NULL, + NULL, NULL, ©_data); + + AlterSubscription_refresh(sub, copy_data); + + break; + } + + default: + elog(ERROR, "unrecognized ALTER SUBSCRIPTION kind %d", + stmt->kind); } - tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, - replaces); + /* Update the catalog if needed. */ + if (update_tuple) + { + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); - /* Update the catalog. */ - simple_heap_update(rel, &tup->t_self, tup); - CatalogUpdateIndexes(rel, tup); + simple_heap_update(rel, &tup->t_self, tup); + CatalogUpdateIndexes(rel, tup); - ObjectAddressSet(myself, SubscriptionRelationId, subid); + heap_freetuple(tup); + } - /* Cleanup. */ - heap_freetuple(tup); heap_close(rel, RowExclusiveLock); + ObjectAddressSet(myself, SubscriptionRelationId, subid); + InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0); return myself; @@ -493,11 +731,14 @@ DropSubscription(DropSubscriptionStmt *stmt) ReleaseSysCache(tup); + /* Remove any associated relation synchronization states. */ + RemoveSubscriptionRel(subid, InvalidOid); + /* Protect against launcher restarting the worker. */ LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE); /* Kill the apply worker so that the slot becomes accessible. */ - logicalrep_worker_stop(subid); + logicalrep_worker_stop(subid, InvalidOid); /* Remove the origin tracking if exists. */ snprintf(originname, sizeof(originname), "pg_%u", subid); @@ -691,3 +932,17 @@ RenameSubscription(RenameStmt *stmt) return address; } + +/* qsort comparison function */ +static int +oid_cmp(const void *p1, const void *p2) +{ + Oid v1 = *((const Oid *) p1); + Oid v2 = *((const Oid *) p2); + + if (v1 < v2) + return -1; + if (v1 > v2) + return 1; + return 0; +} diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 712dfdd..5803f8f 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9118,6 +9118,7 @@ AlterSubscriptionStmt: { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_OPTIONS; n->subname = $3; n->options = $5; $$ = (Node *)n; @@ -9126,24 +9127,35 @@ AlterSubscriptionStmt: { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_CONNECTION; n->subname = $3; - n->options = list_make1(makeDefElem("conninfo", - (Node *)makeString($5), @1)); + n->conninfo = $5; $$ = (Node *)n; } - | ALTER SUBSCRIPTION name SET PUBLICATION publication_name_list + | ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_REFRESH; n->subname = $3; - n->options = list_make1(makeDefElem("publication", - (Node *)$6, @1)); + n->options = $6; + $$ = (Node *)n; + } + | ALTER SUBSCRIPTION name SET PUBLICATION publication_name_list opt_definition + { + AlterSubscriptionStmt *n = + makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_PUBLICATION; + n->subname = $3; + n->publication = $6; + n->options = $7; $$ = (Node *)n; } | ALTER SUBSCRIPTION name ENABLE_P { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_ENABLED; n->subname = $3; n->options = list_make1(makeDefElem("enabled", (Node *)makeInteger(TRUE), @1)); @@ -9153,11 +9165,13 @@ AlterSubscriptionStmt: { AlterSubscriptionStmt *n = makeNode(AlterSubscriptionStmt); + n->kind = ALTER_SUBSCRIPTION_ENABLED; n->subname = $3; n->options = list_make1(makeDefElem("enabled", (Node *)makeInteger(FALSE), @1)); $$ = (Node *)n; - } ; + } + ; /***************************************************************************** * diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 7176cf1..231f029 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3398,6 +3398,12 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_SYNC_REP: event_name = "SyncRep"; break; + case WAIT_EVENT_LOGICAL_SYNC_DATA: + event_name = "LogicalSyncData"; + break; + case WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE: + event_name = "LogicalSyncStateChange"; + break; /* no default case, so that compiler will warn */ } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 7df3698..3c86816 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -23,6 +23,7 @@ #include "pqexpbuffer.h" #include "access/xlog.h" #include "miscadmin.h" +#include "nodes/makefuncs.h" #include "pgstat.h" #include "replication/logicalproto.h" #include "replication/walreceiver.h" @@ -30,6 +31,8 @@ #include "utils/builtins.h" #include "utils/pg_lsn.h" +#define atooid(x) ((Oid) strtoul((x), NULL, 10)) + PG_MODULE_MAGIC; void _PG_init(void); @@ -70,6 +73,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, XLogRecPtr *lsn); static bool libpqrcv_command(WalReceiverConn *conn, const char *cmd, char **err); +static List *libpqrcv_table_list(WalReceiverConn *conn, List *publications); +static void libpqrcv_table_info(WalReceiverConn *conn, const char *nspname, + const char *relname, LogicalRepRelation *lrel); +static void libpqrcv_table_copy(WalReceiverConn *conn, + const char *nspname, const char *relname); static void libpqrcv_disconnect(WalReceiverConn *conn); static WalReceiverFunctionsType PQWalReceiverFunctions = { @@ -84,6 +92,9 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_send, libpqrcv_create_slot, libpqrcv_command, + libpqrcv_table_list, + libpqrcv_table_info, + libpqrcv_table_copy, libpqrcv_disconnect }; @@ -362,10 +373,8 @@ libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli) * next timeline's ID, or just CommandComplete if the server was shut * down. * - * If we had not yet received CopyDone from the backend, PGRES_COPY_IN - * would also be possible. However, at the moment this function is only - * called after receiving CopyDone from the backend - the walreceiver - * never terminates replication on its own initiative. + * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT + * is also possible in case we aborted the copy in mid-stream. */ res = PQgetResult(conn->streamConn); if (PQresultStatus(res) == PGRES_TUPLES_OK) @@ -545,6 +554,203 @@ libpqrcv_PQexec(PGconn *streamConn, const char *query) } /* + * Obtain list of tables that belong to given replication sets. + */ +static List * +libpqrcv_table_list(WalReceiverConn *conn, List *publications) +{ + StringInfoData cmd; + PGresult *res; + int i; + ListCell *lc; + bool first; + List *tablelist = NIL; + + Assert(conn->logical); + Assert(list_length(publications) > 0); + + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" + " FROM pg_catalog.pg_publication_tables t\n" + " WHERE t.pubname IN ("); + first = true; + foreach (lc, publications) + { + char *pubname = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoString(&cmd, ", "); + + appendStringInfo(&cmd, "%s", quote_literal_cstr(pubname)); + } + appendStringInfoString(&cmd, ")"); + + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive list of replicated tables from the publisher: %s", + PQerrorMessage(conn->streamConn)))); + } + if (PQnfields(res) != 2) + { + int nfields = PQnfields(res); + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from publisher"), + errdetail("Expected 2 fields, got %d fields.", nfields))); + } + + for (i = 0; i < PQntuples(res); i++) + { + RangeVar *rv; + + rv = makeRangeVar(pstrdup(PQgetvalue(res, i, 0)), + pstrdup(PQgetvalue(res, i, 1)), -1); + + tablelist = lappend(tablelist, rv); + } + + PQclear(res); + + return tablelist; +} + +/* + * Fetch table info of a named table from the publisher and fill the lrel. + */ +static void +libpqrcv_table_info(WalReceiverConn *conn, const char *nspname, + const char *relname, LogicalRepRelation *lrel) +{ + StringInfoData cmd; + PGresult *res; + int i; + + Assert(conn->logical); + + /* First fetch the oid of the table. */ + initStringInfo(&cmd); + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" + " FROM pg_catalog.pg_class c," + " pg_catalog.pg_namespace n" + " WHERE n.nspname = %s" + " AND c.relname = %s" + " AND c.relkind = 'r'", + quote_literal_cstr(nspname), + quote_literal_cstr(relname)); + + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not fetch table info for table %s from publisher: %s", + quote_qualified_identifier(nspname, relname), + PQerrorMessage(conn->streamConn)))); + } + if (PQntuples(res) != 1) + { + PQclear(res); + ereport(ERROR, + (errmsg("table %s not found on publisher", + quote_qualified_identifier(nspname, relname)))); + } + + lrel->remoteid = atooid(PQgetvalue(res, 0, 0)); + lrel->replident = *PQgetvalue(res, 0, 1); + PQclear(res); + + lrel->nspname = pstrdup(nspname); + lrel->relname = pstrdup(relname); + + /* Now fetch columns. */ + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT a.attname," + " a.atttypid," + " a.atttypmod," + " a.attnum = ANY(i.indkey)" + " FROM pg_catalog.pg_attribute a" + " LEFT JOIN pg_catalog.pg_index i" + " ON (i.indexrelid = pg_get_replica_identity_index(%u))" + " WHERE a.attnum > 0::pg_catalog.int2" + " AND NOT a.attisdropped" + " AND a.attrelid = %u" + " ORDER BY a.attnum", + lrel->remoteid, lrel->remoteid); + + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not fetch table info for table %s: %s", + quote_qualified_identifier(nspname, relname), + PQerrorMessage(conn->streamConn)))); + } + if (PQnfields(res) != 4) + { + int nfields = PQnfields(res); + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from publisher"), + errdetail("Expected 4 fields, got %d fields.", nfields))); + } + + lrel->natts = PQntuples(res); + lrel->attnames = palloc(lrel->natts * sizeof(char *)); + lrel->atttyps = palloc(lrel->natts * sizeof(Oid)); + lrel->attkeys = NULL; + for (i = 0; i < lrel->natts; i++) + { + lrel->attnames[i] = pstrdup(PQgetvalue(res, i, 0)); + lrel->atttyps[i] = atooid(PQgetvalue(res, i, 1)); + if (strcmp(PQgetvalue(res, i, 3), "t") != 0) + lrel->attkeys = bms_add_member(lrel->attkeys, i); + } + PQclear(res); +} + +/* + * Start copy proccess of the existing data in a table. + */ +static void +libpqrcv_table_copy(WalReceiverConn *conn, const char *nspname, + const char *relname) +{ + StringInfoData cmd; + PGresult *res; + + Assert(conn->logical); + + initStringInfo(&cmd); + appendStringInfo(&cmd, "COPY %s TO STDOUT", + quote_qualified_identifier(nspname, relname)); + + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_COPY_OUT) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not start initial table contents copy: %s", + PQerrorMessage(conn->streamConn)))); + } + PQclear(res); +} + +/* * Disconnect connection to primary, if any. */ static void @@ -606,8 +812,19 @@ libpqrcv_receive(WalReceiverConn *conn, char **buffer, PGresult *res; res = PQgetResult(conn->streamConn); - if (PQresultStatus(res) == PGRES_COMMAND_OK || - PQresultStatus(res) == PGRES_COPY_IN) + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + + /* Verify that there are no more results */ + res = PQgetResult(conn->streamConn); + if (res != NULL) + ereport(ERROR, + (errmsg("unexpected result after CommandComplete: %s", + PQerrorMessage(conn->streamConn)))); + return -1; + } + else if (PQresultStatus(res) == PGRES_COPY_IN) { PQclear(res); return -1; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 259befa..bb417b0 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -15,6 +15,6 @@ include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) $(CPPFLAGS) OBJS = decode.o launcher.o logical.o logicalfuncs.o message.o origin.o \ - proto.o relation.o reorderbuffer.o snapbuild.o worker.o + proto.o relation.o reorderbuffer.o snapbuild.o tablesync.o worker.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index b5240dc..7ed0bfc 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -56,6 +56,8 @@ #define DEFAULT_NAPTIME_PER_CYCLE 180000L int max_logical_replication_workers = 4; +int max_sync_workers_per_subscription = 2; + LogicalRepWorker *MyLogicalRepWorker = NULL; typedef struct LogicalRepCtxStruct @@ -198,20 +200,22 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, /* * Walks the workers array and searches for one that matches given - * subscription id. + * subscription id and relid. */ LogicalRepWorker * -logicalrep_worker_find(Oid subid) +logicalrep_worker_find(Oid subid, Oid relid, bool only_running) { int i; LogicalRepWorker *res = NULL; Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + /* Search for attached worker for a given subscription id. */ for (i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && w->proc && IsBackendPid(w->proc->pid)) + if (w->subid == subid && w->relid == relid && + (!only_running || (w->proc && IsBackendPid(w->proc->pid)))) { res = w; break; @@ -225,7 +229,8 @@ logicalrep_worker_find(Oid subid) * Start new apply background worker. */ void -logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) +logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, + Oid relid) { BackgroundWorker bgw; BackgroundWorkerHandle *bgw_handle; @@ -273,6 +278,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) worker->dbid = dbid; worker->userid = userid; worker->subid = subid; + worker->relid = relid; LWLockRelease(LogicalRepWorkerLock); @@ -284,6 +290,13 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication worker for subscription %u", subid); + if (OidIsValid(relid)) + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication worker %u sync %u", subid, relid); + else + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication worker %u", subid); + bgw.bgw_restart_time = BGW_NEVER_RESTART; bgw.bgw_notify_pid = MyProcPid; bgw.bgw_main_arg = slot; @@ -309,7 +322,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid) * not being started during this function call. */ void -logicalrep_worker_stop(Oid subid) +logicalrep_worker_stop(Oid subid, Oid relid) { LogicalRepWorker *worker; @@ -317,7 +330,7 @@ logicalrep_worker_stop(Oid subid) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid); + worker = logicalrep_worker_find(subid, relid, false); /* No worker, nothing to do. */ if (!worker) @@ -351,7 +364,7 @@ logicalrep_worker_stop(Oid subid) /* Check if the worker has started. */ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - worker = logicalrep_worker_find(subid); + worker = logicalrep_worker_find(subid, relid, false); if (!worker || worker->proc) break; } @@ -389,6 +402,22 @@ logicalrep_worker_stop(Oid subid) } /* + * Wake up (using latch) the logical replication worker. + */ +void +logicalrep_worker_wakeup(Oid subid, Oid relid) +{ + LogicalRepWorker *worker; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(subid, relid, true); + LWLockRelease(LogicalRepWorkerLock); + + if (worker) + SetLatch(&worker->proc->procLatch); +} + +/* * Attach to a slot. */ void @@ -451,6 +480,29 @@ logicalrep_worker_sigterm(SIGNAL_ARGS) } /* + * Count the number of registered (not necessarily running) sync workers + * for a subscription. + */ +int +logicalrep_sync_worker_count(Oid subid) +{ + int i; + int res = 0; + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + /* Search for attached worker for a given subscription id. */ + for (i = 0; i < max_logical_replication_workers; i++) + { + LogicalRepWorker *w = &LogicalRepCtx->workers[i]; + if (w->subid == subid && OidIsValid(w->relid)) + res++; + } + + return res; +} + +/* * ApplyLauncherShmemSize * Compute space needed for replication launcher shared memory */ @@ -603,12 +655,13 @@ ApplyLauncherMain(Datum main_arg) LogicalRepWorker *w; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); if (sub->enabled && w == NULL) { - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, sub->owner); + logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid); last_start_time = now; wait_time = wal_retrieve_retry_interval; /* Limit to one worker per mainloop cycle. */ @@ -662,7 +715,7 @@ ApplyLauncherMain(Datum main_arg) Datum pg_stat_get_subscription(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_COLS 7 +#define PG_STAT_GET_SUBSCRIPTION_COLS 8 Oid subid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); int i; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; @@ -721,27 +774,31 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) MemSet(nulls, 0, sizeof(nulls)); values[0] = ObjectIdGetDatum(worker.subid); - values[1] = Int32GetDatum(worker_pid); + if (!OidIsValid(worker.relid)) + nulls[1] = true; + else + values[1] = ObjectIdGetDatum(worker.relid); + values[2] = Int32GetDatum(worker_pid); if (XLogRecPtrIsInvalid(worker.last_lsn)) - nulls[2] = true; + nulls[3] = true; else - values[2] = LSNGetDatum(worker.last_lsn); + values[3] = LSNGetDatum(worker.last_lsn); if (worker.last_send_time == 0) - nulls[3] = true; + nulls[4] = true; else - values[3] = TimestampTzGetDatum(worker.last_send_time); + values[4] = TimestampTzGetDatum(worker.last_send_time); if (worker.last_recv_time == 0) - nulls[4] = true; + nulls[5] = true; else - values[4] = TimestampTzGetDatum(worker.last_recv_time); + values[5] = TimestampTzGetDatum(worker.last_recv_time); if (XLogRecPtrIsInvalid(worker.reply_lsn)) - nulls[5] = true; + nulls[6] = true; else - values[5] = LSNGetDatum(worker.reply_lsn); + values[6] = LSNGetDatum(worker.reply_lsn); if (worker.reply_time == 0) - nulls[6] = true; + nulls[7] = true; else - values[6] = TimestampTzGetDatum(worker.reply_time); + values[7] = TimestampTzGetDatum(worker.reply_time); tuplestore_putvalues(tupstore, tupdesc, values, nulls); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5529ac8..b228acf 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -30,6 +30,8 @@ #include "miscadmin.h" +#include "access/heapam.h" +#include "access/htup.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -43,6 +45,7 @@ #include "storage/procarray.h" #include "utils/memutils.h" +#include "utils/tuplestore.h" /* data for errcontext callback */ typedef struct LogicalErrorCallbackState @@ -459,9 +462,12 @@ FreeDecodingContext(LogicalDecodingContext *ctx) if (ctx->callbacks.shutdown_cb != NULL) shutdown_cb_wrapper(ctx); - ReorderBufferFree(ctx->reorder); - FreeSnapshotBuilder(ctx->snapshot_builder); - XLogReaderFree(ctx->reader); + if (ctx->reorder) + ReorderBufferFree(ctx->reorder); + if (ctx->snapshot_builder) + FreeSnapshotBuilder(ctx->snapshot_builder); + if (ctx->reader) + XLogReaderFree(ctx->reader); MemoryContextDelete(ctx->context); } diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 383c6eb..cf161d2 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -19,6 +19,7 @@ #include "access/heapam.h" #include "access/sysattr.h" #include "catalog/namespace.h" +#include "catalog/pg_subscription_rel.h" #include "nodes/makefuncs.h" #include "replication/logicalrelation.h" #include "replication/worker_internal.h" @@ -357,6 +358,12 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) else entry->localrel = heap_open(entry->localreloid, lockmode); + if (entry->state != SUBREL_STATE_READY) + entry->state = GetSubscriptionRelState(MySubscription->oid, + entry->localreloid, + &entry->statelsn, + true); + return entry; } diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 1e02aa9..284bb02 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -498,51 +498,32 @@ SnapBuildBuildSnapshot(SnapBuild *builder, TransactionId xid) } /* - * Export a snapshot so it can be set in another session with SET TRANSACTION - * SNAPSHOT. - * - * For that we need to start a transaction in the current backend as the - * importing side checks whether the source transaction is still open to make - * sure the xmin horizon hasn't advanced since then. + * Build the initial slot snapshot and convert it to normal snapshot that + * is understood by HeapTupleSatisfiesMVCC. * - * After that we convert a locally built snapshot into the normal variant - * understood by HeapTupleSatisfiesMVCC et al. + * The snapshot will be usable directly in current transaction or exported + * for loading in different transaction. */ -const char * -SnapBuildExportSnapshot(SnapBuild *builder) +Snapshot +SnapBuildInitalSnapshot(SnapBuild *builder) { Snapshot snap; - char *snapname; TransactionId xid; TransactionId *newxip; int newxcnt = 0; + Assert(!FirstSnapshotSet); + Assert(XactIsoLevel = XACT_REPEATABLE_READ); + if (builder->state != SNAPBUILD_CONSISTENT) - elog(ERROR, "cannot export a snapshot before reaching a consistent state"); + elog(ERROR, "cannot build and initial slot snapshot before reaching a consistent state"); if (!builder->committed.includes_all_transactions) - elog(ERROR, "cannot export a snapshot, not all transactions are monitored anymore"); + elog(ERROR, "cannot build and initial slot snapshot, not all transactions are monitored anymore"); /* so we don't overwrite the existing value */ if (TransactionIdIsValid(MyPgXact->xmin)) - elog(ERROR, "cannot export a snapshot when MyPgXact->xmin already is valid"); - - if (IsTransactionOrTransactionBlock()) - elog(ERROR, "cannot export a snapshot from within a transaction"); - - if (SavedResourceOwnerDuringExport) - elog(ERROR, "can only export one snapshot at a time"); - - SavedResourceOwnerDuringExport = CurrentResourceOwner; - ExportInProgress = true; - - StartTransactionCommand(); - - Assert(!FirstSnapshotSet); - - /* There doesn't seem to a nice API to set these */ - XactIsoLevel = XACT_REPEATABLE_READ; - XactReadOnly = true; + elog(ERROR, "cannot build and initial slot snapshot when MyPgXact->xmin already is valid"); snap = SnapBuildBuildSnapshot(builder, GetTopTransactionId()); @@ -577,7 +558,9 @@ SnapBuildExportSnapshot(SnapBuild *builder) if (test == NULL) { if (newxcnt >= GetMaxSnapshotXidCount()) - elog(ERROR, "snapshot too large"); + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("initial slot snapshot too large"))); newxip[newxcnt++] = xid; } @@ -588,9 +571,43 @@ SnapBuildExportSnapshot(SnapBuild *builder) snap->xcnt = newxcnt; snap->xip = newxip; + return snap; +} + +/* + * Export a snapshot so it can be set in another session with SET TRANSACTION + * SNAPSHOT. + * + * For that we need to start a transaction in the current backend as the + * importing side checks whether the source transaction is still open to make + * sure the xmin horizon hasn't advanced since then. + */ +const char * +SnapBuildExportSnapshot(SnapBuild *builder) +{ + Snapshot snap; + char *snapname; + + if (IsTransactionOrTransactionBlock()) + elog(ERROR, "cannot export a snapshot from within a transaction"); + + if (SavedResourceOwnerDuringExport) + elog(ERROR, "can only export one snapshot at a time"); + + SavedResourceOwnerDuringExport = CurrentResourceOwner; + ExportInProgress = true; + + StartTransactionCommand(); + + /* There doesn't seem to a nice API to set these */ + XactIsoLevel = XACT_REPEATABLE_READ; + XactReadOnly = true; + + snap = SnapBuildInitalSnapshot(builder); + /* - * now that we've built a plain snapshot, use the normal mechanisms for - * exporting it + * now that we've built a plain snapshot, make it active and use the + * normal mechanisms for exporting it */ snapname = ExportSnapshot(snap); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c new file mode 100644 index 0000000..e609927 --- /dev/null +++ b/src/backend/replication/logical/tablesync.c @@ -0,0 +1,727 @@ +/*------------------------------------------------------------------------- + * tablesync.c + * PostgreSQL logical replication + * + * Copyright (c) 2012-2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/tablesync.c + * + * NOTES + * This file contains code for initial table data synchronization for + * logical replication. + * + * The initial data synchronization is done separately for each table, + * in separate apply worker that only fetches the initial snapshot data + * from the publisher and then synchronizes the position in stream with + * the main apply worker. + * + * The are several reasons for doing the synchronization this way: + * - It allows us to parallelize the initial data synchronization + * which lowers the time needed for it to happen. + * - The initial synchronization does not have to hold the xid and LSN + * for the time it takes to copy data of all tables, causing less + * bloat and lower disk consumption compared to doing the + * synchronization in single process for whole database. + * - It allows us to synchronize the tables added after the initial + * synchronization has finished. + * + * The stream position synchronization works in multiple steps. + * - Sync finishes copy and sets table state as SYNCWAIT and waits + * for state to change in a loop. + * - Apply periodically checks tables that are synchronizing for SYNCWAIT. + * When the desired state appears it will compare its position in the + * stream with the SYNCWAIT position and based on that changes the + * state to based on following rules: + * - if the apply is in front of the sync in the wal stream the new + * state is set to CATCHUP and apply loops until the sync process + * catches up to the same LSN as apply + * - if the sync if in front of the apply in the wal stream the new + * state is set to SYNCDONE + * - if both apply and sync are at the same position in the wal stream + * the state of the table is set to READY + * - If the state was set to CATCHUP sync will read the stream and + * apply changes until it catches up to the specified stream + * position and then sets state to READY and signals apply that it + * can stop waiting and exits, if the state was set to something + * else than CATCHUP the sync process will simply end. + * - If the state was set to SYNCDONE by apply, the apply will + * continue tracking the table until it reaches the SYNCDONE stream + * position at which point it sets state to READY and stops tracking. + * + * The catalog pg_subscription_rel is used to keep information about + * synchronization state of individual tables. + * + * Example flows look like this: + * - Apply is in front: + * sync:8 + * -> set SYNCWAIT + * apply:10 + * -> set CATCHUP + * -> enter wait-loop + * sync:10 + * -> set READY + * -> exit + * apply:10 + * -> exit wait-loop + * -> continue rep + * - Sync in front: + * sync:10 + * -> set SYNCWAIT + * apply:8 + * -> set SYNCDONE + * -> continue per-table filtering + * sync:10 + * -> exit + * apply:10 + * -> set READY + * -> stop per-table filtering + * -> continue rep + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "pgstat.h" + +#include "access/xact.h" + +#include "catalog/pg_subscription_rel.h" + +#include "commands/copy.h" + +#include "replication/logicallauncher.h" +#include "replication/logicalrelation.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" + +#include "storage/ipc.h" + +#include "utils/lsyscache.h" +#include "utils/memutils.h" + +static List *table_states = NIL; +static bool table_states_valid = false; + +StringInfo copybuf = NULL; + +/* + * Exit routine for synchronization worker. + */ +static void +finish_sync_worker(char *slotname) +{ + /* Flush all writes. */ + XLogFlush(GetXLogWriteRecPtr()); + + /* Find the main apply worker and signal it. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); + + ereport(LOG, + (errmsg("logical replication synchronization worker finished processing"))); + + /* Stop gracefully */ + walrcv_disconnect(wrconn); + proc_exit(0); +} + +/* + * Wait until the table synchronization change. + * + * Returns false if the relation subscription state disappeared. + */ +static bool +wait_for_sync_status_change(SubscriptionRelState *rstate) +{ + int rc; + char state = rstate->state; + + while (!got_SIGTERM) + { + StartTransactionCommand(); + rstate->state = GetSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, + &rstate->lsn, + true); + CommitTransactionCommand(); + + /* Status record was removed. */ + if (rstate->state == SUBREL_STATE_UNKNOWN) + return false; + + if (rstate->state != state) + return true; + + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 10000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + ResetLatch(&MyProc->procLatch); + } + + return false; +} + +/* + * Callback from syscache invalidation. + */ +void +invalidate_syncing_table_states(Datum arg, int cacheid, uint32 hashvalue) +{ + table_states_valid = false; +} + +/* + * Handle table synchronization cooperation from the synchronization + * worker. + * + * If the sync worker is in catch up mode and reached the predetermined + * synchronization point in wal stream, it will mark the table as ready and + * finish. + */ +static void +process_syncing_tables_for_sync(char *slotname, XLogRecPtr end_lsn) +{ + SubscriptionRelState *rstate; + TimeLineID tli; + + Assert(!IsTransactionState()); + + /* + * Synchronization workers don't keep track of all synchronization + * tables, they only care about their table. + */ + if (!table_states_valid) + { + MemoryContext oldctx; + + /* Clean the old list. */ + list_free_deep(table_states); + table_states = NIL; + + StartTransactionCommand(); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + rstate = palloc(sizeof(SubscriptionRelState)); + MemoryContextSwitchTo(oldctx); + + /* Fetch the table state. */ + rstate->state = GetSubscriptionRelState(MySubscription->oid, + MyLogicalRepWorker->relid, + &rstate->lsn, true); + + if (rstate->state != SUBREL_STATE_UNKNOWN) + table_states = list_make1(rstate); + + CommitTransactionCommand(); + + table_states_valid = true; + } + + /* Somebody removed table underneath this worker, nothing more to do. */ + if (list_length(table_states) == 0) + { + walrcv_endstreaming(wrconn, &tli); + finish_sync_worker(slotname); + } + + Assert(list_length(table_states) == 1); + + /* Check if we are done with catchup now. */ + rstate = (SubscriptionRelState *) linitial(table_states); + Assert(rstate->relid == MyLogicalRepWorker->relid); + + if (rstate->state == SUBREL_STATE_CATCHUP) + { + Assert(rstate->lsn != InvalidXLogRecPtr); + + if (rstate->lsn == end_lsn) + { + rstate->state = SUBREL_STATE_READY; + rstate->lsn = InvalidXLogRecPtr; + /* Update state of the synchronization. */ + StartTransactionCommand(); + SetSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); + CommitTransactionCommand(); + + walrcv_endstreaming(wrconn, &tli); + finish_sync_worker(slotname); + } + } +} + +/* + * Handle table synchronization cooperation from the apply worker. + * + * Walk over all subscription tables that are individually tracked by apply + * process (currently all that have state other than SUBREL_STATE_READY) and + * manage synchronization for them. + * + * In case there are tables that need synchronized and are not being + * synchronized yet (and there are free slots for sync workers) it will start + * sync workers for them. + * + * For tables that are being synchronized already, it will check if sync + * workers either need action from the apply worker or have finished. + * + * The usual action needed by apply is to mark table for catchup and wait for + * the catchup to happen. In case that sync worker got in front of apply + * worker it will mark the table as synced but not ready yet as it needs to be + * tracked until apply reaches the same position to which it was synced. + * + * In case the synchronization position is reached the table can be marked + * as ready and no longer tracked. + */ +static void +process_syncing_tables_for_apply(char *slotname, XLogRecPtr end_lsn) +{ + ListCell *lc; + + Assert(!IsTransactionState()); + + /* We need up to date sync state info for subscription tables here. */ + if (!table_states_valid) + { + MemoryContext oldctx; + List *rstates; + ListCell *lc; + SubscriptionRelState *rstate; + + /* Clean the old list. */ + list_free_deep(table_states); + table_states = NIL; + + StartTransactionCommand(); + + /* Fetch all non-ready tables. */ + rstates = GetSubscriptionNotReadyRelations(MySubscription->oid); + + /* Allocate the tracking info in a permanent memory context. */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + foreach(lc, rstates) + { + rstate = palloc(sizeof(SubscriptionRelState)); + memcpy(rstate, lfirst(lc), sizeof(SubscriptionRelState)); + table_states = lappend(table_states, rstate); + } + MemoryContextSwitchTo(oldctx); + + CommitTransactionCommand(); + + table_states_valid = true; + } + + /* Process all tables that are being synchronized. */ + foreach(lc, table_states) + { + SubscriptionRelState *rstate = (SubscriptionRelState *)lfirst(lc); + + /* + * When the synchronization process is at the catchup phase we need + * to ensure that we are not behind it (it's going to wait at this + * point for the change of state). Once we are in front or at the + * same position as the synchronization process we can signal it to + * finish the catchup. + */ + if (rstate->state == SUBREL_STATE_SYNCWAIT) + { + if (end_lsn > rstate->lsn) + { + /* + * Apply is in front, tell sync to catchup. And wait until + * it does. + */ + rstate->state = SUBREL_STATE_CATCHUP; + rstate->lsn = end_lsn; + StartTransactionCommand(); + SetSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); + CommitTransactionCommand(); + + /* Signal the worker as it may be waiting for us. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, + rstate->relid); + + /* + * Enter busy loop and wait for synchronization status + * change. + * + * XXX: If the sync worker gets into failure loop here + * for some reason, the replication will stall forever. + */ + if (wait_for_sync_status_change(rstate)) + Assert(rstate->state == SUBREL_STATE_READY); + } + else + { + /* + * Apply is either behind in which case sync worker is done + * but apply needs to keep tracking the table until it + * catches up to where sync finished. + * Or apply and sync are at the same position in which case + * table can be switched to standard replication mode + * immediately. + */ + if (end_lsn < rstate->lsn) + rstate->state = SUBREL_STATE_SYNCDONE; + else + rstate->state = SUBREL_STATE_READY; + + StartTransactionCommand(); + SetSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); + CommitTransactionCommand(); + + /* Signal the worker as it may be waiting for us. */ + logicalrep_worker_wakeup(MyLogicalRepWorker->subid, + rstate->relid); + } + } + else if (rstate->state == SUBREL_STATE_SYNCDONE && + end_lsn >= rstate->lsn) + { + /* + * Apply has caught up to the position where the table sync + * has finished, time to mark the table as ready so that + * apply will just continue to replicate it normally. + */ + rstate->state = SUBREL_STATE_READY; + rstate->lsn = InvalidXLogRecPtr; + StartTransactionCommand(); + SetSubscriptionRelState(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); + CommitTransactionCommand(); + } + + /* + * In case table is supposed to be synchronizing but the + * synchronization worker is not running, start it. + * Limit the number of launched workers here to one (for now). + */ + if (rstate->state != SUBREL_STATE_READY && + rstate->state != SUBREL_STATE_SYNCDONE) + { + LogicalRepWorker *worker; + int nworkers; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, + rstate->relid, false); + nworkers = logicalrep_sync_worker_count(MyLogicalRepWorker->subid); + LWLockRelease(LogicalRepWorkerLock); + + /* + * If there is no sync worker registered for the table and there + * is some free sync worker slot, start new sync worker for the + * table. + */ + if (!worker && nworkers < max_sync_workers_per_subscription) + logicalrep_worker_launch(MyLogicalRepWorker->dbid, + MySubscription->oid, + MySubscription->name, + MyLogicalRepWorker->userid, + rstate->relid); + + } + } +} + +/* + * Process state possible change(s) of tables that are being synchronized. + */ +void +process_syncing_tables(char *slotname, XLogRecPtr end_lsn) +{ + if (OidIsValid(MyLogicalRepWorker->relid)) + process_syncing_tables_for_sync(slotname, end_lsn); + else + process_syncing_tables_for_apply(slotname, end_lsn); +} + +/* + * Create list of columns for COPY based on logical relation mapping. + */ +static List * +make_copy_attnamelist(LogicalRepRelMapEntry *rel) +{ + List *attnamelist = NIL; + TupleDesc desc = RelationGetDescr(rel->localrel); + int i; + + for (i = 0; i < desc->natts; i++) + { + int remoteattnum = rel->attrmap[i]; + + /* Skip dropped attributes. */ + if (desc->attrs[i]->attisdropped) + continue; + + /* Skip attributes that are missing on remote side. */ + if (remoteattnum < 0) + continue; + + attnamelist = lappend(attnamelist, + makeString(rel->remoterel.attnames[remoteattnum])); + } + + return attnamelist; +} + +/* + * Callback for the COPY FROM which reads from the remote connection + * and passes the data back to our local COPY. + */ +static int +copy_read_data(void *outbuf, int minread, int maxread) +{ + int bytesread = 0; + int avail; + + /* If there are some leftover data from previous read, use them. */ + avail = copybuf->len - copybuf->cursor; + if (avail) + { + if (avail > maxread) + avail = maxread; + memcpy(outbuf, ©buf->data[copybuf->cursor], avail); + copybuf->cursor += avail; + maxread -= avail; + bytesread += avail; + } + + while (!got_SIGTERM && maxread > 0 && bytesread < minread) + { + pgsocket fd = PGINVALID_SOCKET; + int rc; + int len; + char *buf = NULL; + + for (;;) + { + /* Try read the data. */ + len = walrcv_receive(wrconn, &buf, &fd); + + CHECK_FOR_INTERRUPTS(); + + if (len == 0) + { + break; + } + else if (len < 0) + { + return bytesread; + } + else + { + /* Process the data */ + copybuf->data = buf; + copybuf->len = len; + copybuf->cursor = 0; + + avail = copybuf->len - copybuf->cursor; + if (avail > maxread) + avail = maxread; + memcpy(outbuf, ©buf->data[copybuf->cursor], avail); + outbuf = (void *) ((char *) outbuf + avail); + copybuf->cursor += avail; + maxread -= avail; + bytesread += avail; + } + + if (maxread <= 0 || bytesread >= minread) + return bytesread; + } + + /* + * Wait for more data or latch. + */ + rc = WaitLatchOrSocket(&MyProc->procLatch, + WL_SOCKET_READABLE | WL_LATCH_SET | + WL_TIMEOUT | WL_POSTMASTER_DEATH, + fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + ResetLatch(&MyProc->procLatch); + } + + /* Check for exit condition. */ + if (got_SIGTERM) + proc_exit(0); + + return bytesread; +} + +/* + * Copy existing data of a table from publisher. + * + * Caller is responsible for locking the local relation. + */ +static void +copy_table(Relation rel) +{ + LogicalRepRelMapEntry *relmapentry; + LogicalRepRelation lrel; + CopyState cstate; + List *attnamelist; + + /* Get the publisher relation info. */ + walrcv_table_info(wrconn, + get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel), &lrel); + + /* Put the relation into relmap. */ + logicalrep_relmap_update(&lrel); + + /* Map the publisher relation to local one. */ + relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); + Assert(rel == relmapentry->localrel); + + /* Start copy on the publisher. */ + walrcv_table_copy(wrconn, + get_namespace_name(RelationGetNamespace(rel)), + RelationGetRelationName(rel)); + + copybuf = makeStringInfo(); + + /* Create CopyState for ingestion of the data from publisher. */ + attnamelist = make_copy_attnamelist(relmapentry); + cstate = BeginCopyFrom(NULL, rel, NULL, false, copy_read_data, attnamelist, NIL); + + /* Do the copy */ + (void) CopyFrom(cstate); + + logicalrep_rel_close(relmapentry, NoLock); + + CommandCounterIncrement(); +} + +/* + * Start syncing the table in the sync worker. + * + * The returned slot name is palloced in current memory context. + */ +char * +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) +{ + SubscriptionRelState rstate; + char slotname[NAMEDATALEN]; + char *err; + + /* Check the state of the table synchronization. */ + StartTransactionCommand(); + rstate.relid = MyLogicalRepWorker->relid; + rstate.state = GetSubscriptionRelState(MySubscription->oid, rstate.relid, + &rstate.lsn, false); + + /* + * We are limited to 63 characters of the name length so we cut the + * original slot name to 36 chars because the "_sync_" adds 6, each + * each unsigned integer (oid) has maximum of 10 characters and we have + * one additional "_" separator between slot name and subscription oid. + */ + snprintf(slotname, NAMEDATALEN, "%.36s_%u_sync_%u", + MySubscription->slotname, MySubscription->oid, rstate.relid); + + CommitTransactionCommand(); + + wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err); + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + switch (rstate.state) + { + case SUBREL_STATE_SYNCWAIT: + case SUBREL_STATE_CATCHUP: + /* TODO: truncate table */ + case SUBREL_STATE_INIT: + case SUBREL_STATE_DATA: + { + Relation rel; + XLogRecPtr lsn; + + /* Update the state and make it visible to others. */ + StartTransactionCommand(); + SetSubscriptionRelState(MySubscription->oid, rstate.relid, + SUBREL_STATE_DATA, + InvalidXLogRecPtr); + CommitTransactionCommand(); + + /* + * We want to do the table data sync in single + * transaction so do not close the transaction opened + * above. + */ + StartTransactionCommand(); + + /* + * Don't allow parallel access other than SELECT while + * the initial contents are being copied. + */ + rel = heap_open(rstate.relid, ExclusiveLock); + + /* + * Create temporary slot for the sync process. + * We do this inside transaction so that we can use the + * snapshot made by the slot to get existing data. + */ + if (!walrcv_command(wrconn, + "BEGIN READ ONLY ISOLATION LEVEL " + "REPEATABLE READ", + &err)) + ereport(ERROR, + (errmsg("table copy could not start transaction on publisher"), + errdetail("The error was: %s", err))); + walrcv_create_slot(wrconn, slotname, true, &lsn); + + copy_table(rel); + + if (!walrcv_command(wrconn, "ROLLBACK", &err)) + ereport(ERROR, + (errmsg("table copy could not finish transaction on publisher"), + errdetail("The error was: %s", err))); + + /* + * We are done with the initial data synchronization, + * update the state. + */ + SetSubscriptionRelState(MySubscription->oid, rstate.relid, + SUBREL_STATE_SYNCWAIT, lsn); + heap_close(rel, NoLock); + + /* End the transaction. */ + CommitTransactionCommand(); + + /* + * Wait for main apply worker to either tell us to + * catchup or that we are done. + */ + wait_for_sync_status_change(&rstate); + if (rstate.state != SUBREL_STATE_CATCHUP) + finish_sync_worker(slotname); + break; + } + case SUBREL_STATE_SYNCDONE: + case SUBREL_STATE_READY: + /* Nothing to do here but finish. */ + finish_sync_worker(slotname); + default: + elog(ERROR, "unknown relation state \"%c\"", rstate.state); + } + + return pstrdup(slotname); +} diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 28f3fc5..d47d9f1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -35,6 +35,7 @@ #include "catalog/namespace.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_rel.h" #include "commands/trigger.h" @@ -104,14 +105,15 @@ typedef struct SlotErrCallbackArg } SlotErrCallbackArg; static MemoryContext ApplyContext = NULL; -static MemoryContext ApplyCacheContext = NULL; +MemoryContext ApplyCacheContext = NULL; WalReceiverConn *wrconn = NULL; Subscription *MySubscription = NULL; bool MySubscriptionValid = false; -bool in_remote_transaction = false; +static char *myslotname = NULL; +bool in_remote_transaction = false; static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); @@ -120,6 +122,22 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void reread_subscription(void); /* + * Should this worker apply changes for given relation. + * + * This is mainly needed for initial relation data sync as that runs in + * separate worker process running in parallel and we need some way to skip + * changes coming to the main apply worker during the sync of a table. + */ +static bool +should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) +{ + return rel->state == SUBREL_STATE_READY || + (rel->state == SUBREL_STATE_SYNCDONE && + rel->statelsn < replorigin_session_origin_lsn) || + rel->localreloid == MyLogicalRepWorker->relid; +} + +/* * Make sure that we started local transaction. * * Also switches to ApplyContext as necessary. @@ -428,6 +446,9 @@ apply_handle_commit(StringInfo s) in_remote_transaction = false; + /* Proccess any tables that are being synchronized in parallel. */ + process_syncing_tables(myslotname, commit_data.end_lsn); + pgstat_report_activity(STATE_IDLE, NULL); } @@ -516,6 +537,15 @@ apply_handle_insert(StringInfo s) relid = logicalrep_read_insert(s, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + return; + } /* Initialize the executor state. */ estate = create_estate_for_relation(rel); @@ -604,6 +634,15 @@ apply_handle_update(StringInfo s) relid = logicalrep_read_update(s, &has_oldtup, &oldtup, &newtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + return; + } /* Check if we can do the update. */ check_relation_updatable(rel); @@ -709,6 +748,15 @@ apply_handle_delete(StringInfo s) relid = logicalrep_read_delete(s, &oldtup); rel = logicalrep_rel_open(relid, RowExclusiveLock); + if (!should_apply_changes_for_rel(rel)) + { + /* + * The relation can't become interesting in the middle of the + * transaction so it's safe to unlock it. + */ + logicalrep_rel_close(rel, RowExclusiveLock); + return; + } /* Check if we can do the delete. */ check_relation_updatable(rel); @@ -916,10 +964,8 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply) * Apply main loop. */ static void -ApplyLoop(void) +LogicalRepApplyLoop(XLogRecPtr last_received) { - XLogRecPtr last_received = InvalidXLogRecPtr; - /* Init the ApplyContext which we use for easier cleanup. */ ApplyContext = AllocSetContextCreate(TopMemoryContext, "ApplyContext", @@ -1034,6 +1080,9 @@ ApplyLoop(void) if (!MySubscriptionValid) reread_subscription(); CommitTransactionCommand(); + + /* Process any table synchronization changes. */ + process_syncing_tables(myslotname, last_received); } /* confirm all writes at once */ @@ -1045,7 +1094,11 @@ ApplyLoop(void) /* Check if we need to exit the streaming loop. */ if (endofstream) + { + TimeLineID tli; + walrcv_endstreaming(wrconn, &tli); break; + } /* * Wait for more data or latch. @@ -1330,11 +1383,7 @@ ApplyWorkerMain(Datum main_arg) int worker_slot = DatumGetObjectId(main_arg); MemoryContext oldctx; char originname[NAMEDATALEN]; - RepOriginId originid; XLogRecPtr origin_startpos; - char *err; - int server_version; - TimeLineID startpointTLI; WalRcvStreamOptions options; /* Attach to slot */ @@ -1393,36 +1442,77 @@ ApplyWorkerMain(Datum main_arg) subscription_change_cb, (Datum) 0); - ereport(LOG, - (errmsg("logical replication apply for subscription \"%s\" has started", - MySubscription->name))); - - /* Setup replication origin tracking. */ - snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - originid = replorigin_create(originname); - replorigin_session_setup(originid); - replorigin_session_origin = originid; - origin_startpos = replorigin_session_get_progress(false); + if (OidIsValid(MyLogicalRepWorker->relid)) + elog(LOG, "logical replication sync for subscription %s, table %s started", + MySubscription->name, get_rel_name(MyLogicalRepWorker->relid)); + else + elog(LOG, "logical replication apply for subscription %s started", + MySubscription->name); CommitTransactionCommand(); /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); - wrconn = walrcv_connect(MySubscription->conninfo, true, - MySubscription->name, &err); - if (wrconn == NULL) - ereport(ERROR, - (errmsg("could not connect to the publisher: %s", err))); + + if (OidIsValid(MyLogicalRepWorker->relid)) + { + char *syncslotname; + + /* This is table synchroniation worker, call initial sync. */ + syncslotname = LogicalRepSyncTableStart(&origin_startpos); + + /* The slot name needs to be allocated in permanent memory context. */ + oldctx = MemoryContextSwitchTo(ApplyCacheContext); + syncslotname = pstrdup(syncslotname); + MemoryContextSwitchTo(oldctx); + + pfree(syncslotname); + } + else + { + /* This is main apply worker */ + RepOriginId originid; + TimeLineID startpointTLI; + char *err; + int server_version; + + myslotname = MySubscription->slotname; + + /* Setup replication origin tracking. */ + StartTransactionCommand(); + snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid); + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + replorigin_session_setup(originid); + replorigin_session_origin = originid; + origin_startpos = replorigin_session_get_progress(false); + CommitTransactionCommand(); + + wrconn = walrcv_connect(MySubscription->conninfo, true, myslotname, + &err); + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the publisher: %s", err))); + + /* + * We don't really use the output identify_system for anything + * but it does some initializations on the upstream so let's still + * call it. + */ + (void) walrcv_identify_system(wrconn, &startpointTLI, + &server_version); + + } /* - * We don't really use the output identify_system for anything - * but it does some initializations on the upstream so let's still - * call it. + * Setup callback for syscache so that we know when something + * changes in the subscription relation state. */ - (void) walrcv_identify_system(wrconn, &startpointTLI, &server_version); + CacheRegisterSyscacheCallback(SUBSCRIPTIONRELOID, + invalidate_syncing_table_states, + (Datum) 0); /* Build logical replication streaming options. */ options.logical = true; @@ -1431,11 +1521,11 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM; options.proto.logical.publication_names = MySubscription->publications; - /* Start streaming from the slot. */ + /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); /* Run the main loop. */ - ApplyLoop(); + LogicalRepApplyLoop(origin_startpos); walrcv_disconnect(wrconn); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 04dde5d..ecb797f 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -24,6 +24,7 @@ #include "utils/int8.h" #include "utils/memutils.h" #include "utils/syscache.h" +#include "utils/tuplestore.h" PG_MODULE_MAGIC; diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index d962c76..ea11f06 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -25,6 +25,7 @@ /* Result of the parsing is returned here */ Node *replication_parse_result; +static SQLCmd *make_sqlcmd(void); /* * Bison doesn't allocate anything that needs to live across parser calls, @@ -57,6 +58,7 @@ Node *replication_parse_result; %token SCONST IDENT %token UCONST %token RECPTR +%token T_WORD /* Keyword tokens. */ %token K_BASE_BACKUP @@ -82,7 +84,7 @@ Node *replication_parse_result; %type command %type base_backup start_replication start_logical_replication create_replication_slot drop_replication_slot identify_system - timeline_history + timeline_history sql_cmd %type base_backup_opt_list %type base_backup_opt %type opt_timeline @@ -112,6 +114,7 @@ command: | create_replication_slot | drop_replication_slot | timeline_history + | sql_cmd ; /* @@ -330,6 +333,26 @@ plugin_opt_arg: SCONST { $$ = (Node *) makeString($1); } | /* EMPTY */ { $$ = NULL; } ; + +sql_cmd: + IDENT { $$ = (Node *) make_sqlcmd(); } + ; %% +static SQLCmd * +make_sqlcmd(void) +{ + SQLCmd *cmd = makeNode(SQLCmd); + int tok; + + /* Just move lexer to the end of command. */ + for (;;) + { + tok = yylex(); + if (tok == ';' || tok == 0) + break; + } + return cmd; +} + #include "repl_scanner.c" diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index a3b5f92..9c616cf 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -177,9 +177,7 @@ TEMPORARY { return K_TEMPORARY; } } . { - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("syntax error: unexpected character \"%s\"", yytext))); + return T_WORD; } %% diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f3082c3..429c1eb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -842,7 +842,31 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) * Export a plain (not of the snapbuild.c type) snapshot to the user * that can be imported into another session. */ - snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); + if (!IsTransactionBlock()) + snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); + else + { + Snapshot snap; + + if (XactIsoLevel != XACT_REPEATABLE_READ) + ereport(ERROR, + (errmsg("CREATE_REPLICATION_SLOT ... LOGICAL must be " + "called in REPEATABLE READ isolation mode transaction"))); + + if (FirstSnapshotSet) + ereport(ERROR, + (errmsg("CREATE_REPLICATION_SLOT ... LOGICAL must be called before any query"))); + + if (IsSubTransaction()) + ereport(ERROR, + (errmsg("CREATE_REPLICATION_SLOT ... LOGICAL must not be called in a subtransaction"))); + + XactReadOnly = true; + snap = SnapBuildInitalSnapshot(ctx->snapshot_builder); + MyPgXact->xmin = snap->xmin; + + RestoreTransactionSnapshot(snap, MyProc); + } /* don't need the decoding context anymore */ FreeDecodingContext(ctx); @@ -1293,7 +1317,7 @@ WalSndWaitForWal(XLogRecPtr loc) /* * Execute an incoming replication command. */ -void +bool exec_replication_command(const char *cmd_string) { int parse_rc; @@ -1309,14 +1333,6 @@ exec_replication_command(const char *cmd_string) ereport(log_replication_commands ? LOG : DEBUG1, (errmsg("received replication command: %s", cmd_string))); - /* - * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next - * command arrives. Clean up the old stuff if there's anything. - */ - SnapBuildClearExportedSnapshot(); - - CHECK_FOR_INTERRUPTS(); - cmd_context = AllocSetContextCreate(CurrentMemoryContext, "Replication command context", ALLOCSET_DEFAULT_SIZES); @@ -1332,6 +1348,25 @@ exec_replication_command(const char *cmd_string) cmd_node = replication_parse_result; + /* + * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was + * called outside of transaction the snapshot should be cleared here. + */ + if (!IsTransactionBlock()) + SnapBuildClearExportedSnapshot(); + + /* + * For aborted transactions, don't allow anything except pure SQL, + * the exec_simple_query() will handle it correctly. + */ + if (IsAbortedTransactionBlockState() && cmd_node->type != T_SQLCmd) + ereport(ERROR, + (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), + errmsg("current transaction is aborted, " + "commands ignored until end of transaction block"))); + + CHECK_FOR_INTERRUPTS(); + switch (cmd_node->type) { case T_IdentifySystemCmd: @@ -1339,6 +1374,7 @@ exec_replication_command(const char *cmd_string) break; case T_BaseBackupCmd: + PreventTransactionChain(true, "BASE_BACKUP"); SendBaseBackup((BaseBackupCmd *) cmd_node); break; @@ -1354,6 +1390,8 @@ exec_replication_command(const char *cmd_string) { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; + PreventTransactionChain(true, "START_REPLICATION"); + if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else @@ -1362,9 +1400,16 @@ exec_replication_command(const char *cmd_string) } case T_TimeLineHistoryCmd: + PreventTransactionChain(true, "TIMELINE_HISTORY"); SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); break; + case T_SQLCmd: + if (MyDatabaseId == InvalidOid) + ereport(ERROR, + (errmsg("not connected to database"))); + + return false; default: elog(ERROR, "unrecognized replication command node tag: %u", cmd_node->type); @@ -1376,6 +1421,8 @@ exec_replication_command(const char *cmd_string) /* Send CommandComplete message */ EndCommand("SELECT", DestRemote); + + return true; } /* diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index bb89cce..49922b4 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4054,6 +4054,7 @@ PostgresMain(int argc, char *argv[], case 'Q': /* simple query */ { const char *query_string; + bool walsender_query = false; /* Set statement_timestamp() */ SetCurrentStatementStartTimestamp(); @@ -4062,8 +4063,8 @@ PostgresMain(int argc, char *argv[], pq_getmsgend(&input_message); if (am_walsender) - exec_replication_command(query_string); - else + walsender_query = exec_replication_command(query_string); + if (!walsender_query) exec_simple_query(query_string); send_ready_for_query = true; diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c index 66d09bc..50efe99 100644 --- a/src/backend/utils/adt/misc.c +++ b/src/backend/utils/adt/misc.c @@ -892,3 +892,23 @@ parse_ident(PG_FUNCTION_ARGS) PG_RETURN_DATUM(makeArrayResult(astate, CurrentMemoryContext)); } + +/* + * SQL wrapper around RelationGetReplicaIndex(). + */ +Datum +pg_get_replica_identity_index(PG_FUNCTION_ARGS) +{ + Oid reloid = PG_GETARG_OID(0); + Oid idxoid; + Relation rel; + + rel = heap_open(reloid, AccessShareLock); + idxoid = RelationGetReplicaIndex(rel); + heap_close(rel, AccessShareLock); + + if (OidIsValid(idxoid)) + PG_RETURN_OID(idxoid); + else + PG_RETURN_NULL(); +} diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index bdfaa0c..8fe4032 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -62,6 +62,7 @@ #include "catalog/pg_replication_origin.h" #include "catalog/pg_statistic.h" #include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_rel.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_transform.h" #include "catalog/pg_ts_config.h" @@ -760,6 +761,28 @@ static const struct cachedesc cacheinfo[] = { }, 4 }, + {SubscriptionRelRelationId, /* SUBSCRIPTIONRELOID */ + SubscriptionRelOidIndexId, + 1, + { + ObjectIdAttributeNumber, + 0, + 0, + 0 + }, + 64 + }, + {SubscriptionRelRelationId, /* SUBSCRIPTIONRELMAP */ + SubscriptionRelMapIndexId, + 2, + { + Anum_pg_subscription_rel_srrelid, + Anum_pg_subscription_rel_srsubid, + 0, + 0 + }, + 64 + }, {TableSpaceRelationId, /* TABLESPACEOID */ TablespaceOidIndexId, 1, diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 15a09df..e09693b 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2485,6 +2485,18 @@ static struct config_int ConfigureNamesInt[] = }, { + {"max_sync_workers_per_subscription", + PGC_POSTMASTER, + RESOURCES_ASYNCHRONOUS, + gettext_noop("Maximum number of table synchronization workers per subscription."), + NULL, + }, + &max_sync_workers_per_subscription, + 2, 1, MAX_BACKENDS, + NULL, NULL, NULL + }, + + { {"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Automatic log file rotation will occur after N minutes."), NULL, diff --git a/src/include/catalog/indexing.h b/src/include/catalog/indexing.h index 45605a0..74908fa 100644 --- a/src/include/catalog/indexing.h +++ b/src/include/catalog/indexing.h @@ -343,6 +343,12 @@ DECLARE_UNIQUE_INDEX(pg_subscription_oid_index, 6114, on pg_subscription using b DECLARE_UNIQUE_INDEX(pg_subscription_subname_index, 6115, on pg_subscription using btree(subdbid oid_ops, subname name_ops)); #define SubscriptionNameIndexId 6115 +DECLARE_UNIQUE_INDEX(pg_subscription_rel_oid_index, 6116, on pg_subscription_rel using btree(oid oid_ops)); +#define SubscriptionRelOidIndexId 6116 + +DECLARE_UNIQUE_INDEX(pg_subscription_rel_map_index, 6117, on pg_subscription_rel using btree(srrelid oid_ops, srsubid oid_ops)); +#define SubscriptionRelMapIndexId 6117 + /* last step of initialization script: build the indexes declared above */ BUILD_INDICES diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 3a57d3b..130b3f3 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2016,6 +2016,9 @@ DESCR("is a relation insertable/updatable/deletable"); DATA(insert OID = 3843 ( pg_column_is_updatable PGNSP PGUID 12 10 0 0 0 f f f f t f s s 3 0 16 "2205 21 16" _null_ _null_ _null_ _null_ _null_ pg_column_is_updatable _null_ _null_ _null_ )); DESCR("is a column updatable"); +DATA(insert OID = 6120 ( pg_get_replica_identity_index PGNSP PGUID 12 10 0 0 0 f f f f t f s s 1 0 2205 "2205" _null_ _null_ _null_ _null_ _null_ pg_get_replica_identity_index _null_ _null_ _null_ )); +DESCR("oid of replica identity index if any"); + /* Deferrable unique constraint trigger */ DATA(insert OID = 1250 ( unique_key_recheck PGNSP PGUID 12 1 0 0 0 f f f f t f v s 0 0 2279 "" _null_ _null_ _null_ _null_ _null_ unique_key_recheck _null_ _null_ _null_ )); DESCR("deferred UNIQUE constraint check"); @@ -2772,7 +2775,7 @@ DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f DESCR("statistics: information about currently active replication"); DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ )); DESCR("statistics: information about WAL receiver"); -DATA(insert OID = 6118 ( pg_stat_get_subscription PGNSP PGUID 12 1 0 0 0 f f f f f f s r 1 0 2249 "26" "{26,26,23,3220,1184,1184,3220,1184}" "{i,o,o,o,o,o,o,o}" "{subid,subid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}" _null_ _null_ pg_stat_get_subscription _null_ _null_ _null_ )); +DATA(insert OID = 6118 ( pg_stat_get_subscription PGNSP PGUID 12 1 0 0 0 f f f f f f s r 1 0 2249 "26" "{26,26,26,23,3220,1184,1184,3220,1184}" "{i,o,o,o,o,o,o,o,o}" "{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}" _null_ _null_ pg_stat_get_subscription _null_ _null_ _null_ )); DESCR("statistics: information about subscription"); DATA(insert OID = 2026 ( pg_backend_pid PGNSP PGUID 12 1 0 0 0 f f f f t f s r 0 0 23 "" _null_ _null_ _null_ _null_ _null_ pg_backend_pid _null_ _null_ _null_ )); DESCR("statistics: current backend PID"); diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h new file mode 100644 index 0000000..24bd527 --- /dev/null +++ b/src/include/catalog/pg_subscription_rel.h @@ -0,0 +1,77 @@ +/* ------------------------------------------------------------------------- + * + * pg_subscription_rel.h + * Local info about tables that come from the publisher of a + * subscription (pg_subscription_rel). + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * ------------------------------------------------------------------------- + */ +#ifndef PG_SUBSCRIPTION_REL_H +#define PG_SUBSCRIPTION_REL_H + +#include "catalog/genbki.h" + +/* ---------------- + * pg_subscription_rel definition. cpp turns this into + * typedef struct FormData_pg_subscription_rel + * ---------------- + */ +#define SubscriptionRelRelationId 6102 +#define SubscriptionRelRelation_Rowtype_Id 6103 + +/* Workaround for genbki not knowing about XLogRecPtr */ +#define pg_lsn XLogRecPtr + +CATALOG(pg_subscription_rel,6102) BKI_ROWTYPE_OID(6103) +{ + Oid srsubid; /* Oid of subscription */ + Oid srrelid; /* Oid of relation */ + char srsubstate; /* state of the relation in subscription */ + pg_lsn srsublsn; /* remote lsn of the state change + * used for synchronization coordination */ +} FormData_pg_subscription_rel; + +typedef FormData_pg_subscription_rel *Form_pg_subscription_rel; + +/* ---------------- + * compiler constants for pg_subscription_rel + * ---------------- + */ +#define Natts_pg_subscription_rel 4 +#define Anum_pg_subscription_rel_srsubid 1 +#define Anum_pg_subscription_rel_srrelid 2 +#define Anum_pg_subscription_rel_srsubstate 3 +#define Anum_pg_subscription_rel_srsublsn 4 + +/* ---------------- + * substate constants + * ---------------- + */ +#define SUBREL_STATE_UNKNOWN '\0' /* unknown state (never stored in table) */ +#define SUBREL_STATE_INIT 'i' /* initializing (sublsn NULL) */ +#define SUBREL_STATE_DATA 'd' /* data copy (sublsn NULL) */ +#define SUBREL_STATE_SYNCWAIT 'w' /* waiting for sync (sublsn set) */ +#define SUBREL_STATE_CATCHUP 'c' /* catchup (sublsn set) */ +#define SUBREL_STATE_SYNCDONE 's' /* synced (sublsn set) */ +#define SUBREL_STATE_READY 'r' /* ready (sublsn NULL) */ + +typedef struct SubscriptionRelState +{ + Oid relid; + XLogRecPtr lsn; + char state; +} SubscriptionRelState; + +extern Oid SetSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn); +extern char GetSubscriptionRelState(Oid subid, Oid relid, + XLogRecPtr *sublsn, bool missing_ok); +extern void RemoveSubscriptionRel(Oid subid, Oid relid); + +extern List *GetSubscriptionRelations(Oid subid); +extern List *GetSubscriptionNotReadyRelations(Oid subid); + +#endif /* PG_SUBSCRIPTION_REL_H */ diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index d63ca0f..f081f22 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -21,6 +21,7 @@ /* CopyStateData is private in commands/copy.c */ typedef struct CopyStateData *CopyState; +typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); extern void DoCopy(ParseState *state, const CopyStmt *stmt, int stmt_location, int stmt_len, @@ -28,7 +29,7 @@ extern void DoCopy(ParseState *state, const CopyStmt *stmt, extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options); extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, - bool is_program, List *attnamelist, List *options); + bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options); extern void EndCopyFrom(CopyState cstate); extern bool NextCopyFrom(CopyState cstate, ExprContext *econtext, Datum *values, bool *nulls, Oid *tupleOid); @@ -36,6 +37,8 @@ extern bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); +extern uint64 CopyFrom(CopyState cstate); + extern DestReceiver *CreateCopyDestReceiver(void); #endif /* COPY_H */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 4669245..acafa62 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -475,6 +475,7 @@ typedef enum NodeTag T_DropReplicationSlotCmd, T_StartReplicationCmd, T_TimeLineHistoryCmd, + T_SQLCmd, /* * TAGS FOR RANDOM OTHER STUFF diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index aad4699..3e81def 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3284,10 +3284,22 @@ typedef struct CreateSubscriptionStmt List *options; /* List of DefElem nodes */ } CreateSubscriptionStmt; +typedef enum AlterSubscriptionType +{ + ALTER_SUBSCRIPTION_OPTIONS, + ALTER_SUBSCRIPTION_CONNECTION, + ALTER_SUBSCRIPTION_PUBLICATION, + ALTER_SUBSCRIPTION_REFRESH, + ALTER_SUBSCRIPTION_ENABLED +} AlterSubscriptionType; + typedef struct AlterSubscriptionStmt { NodeTag type; + AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */ char *subname; /* Name of of the subscription */ + char *conninfo; /* Connection string to publisher */ + List *publication; /* One or more publication to subscribe to */ List *options; /* List of DefElem nodes */ } AlterSubscriptionStmt; diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index f27354f..4f4c29e 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -96,4 +96,9 @@ typedef struct TimeLineHistoryCmd TimeLineID timeline; } TimeLineHistoryCmd; +typedef struct SQLCmd +{ + NodeTag type; +} SQLCmd; + #endif /* REPLNODES_H */ diff --git a/src/include/pgstat.h b/src/include/pgstat.h index de8225b..77574e0 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -787,7 +787,9 @@ typedef enum WAIT_EVENT_MQ_SEND, WAIT_EVENT_PARALLEL_FINISH, WAIT_EVENT_SAFE_SNAPSHOT, - WAIT_EVENT_SYNC_REP + WAIT_EVENT_SYNC_REP, + WAIT_EVENT_LOGICAL_SYNC_DATA, + WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE } WaitEventIPC; /* ---------- diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index fd34964..a5088c4 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -31,9 +31,11 @@ typedef struct LogicalDecodingContext /* memory context this is all allocated in */ MemoryContext context; - /* infrastructure pieces */ - XLogReaderState *reader; + /* The associated replication slot */ ReplicationSlot *slot; + + /* infrastructure pieces for decoding */ + XLogReaderState *reader; struct ReorderBuffer *reorder; struct SnapBuild *snapshot_builder; @@ -75,6 +77,7 @@ typedef struct LogicalDecodingContext TransactionId write_xid; } LogicalDecodingContext; + extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, @@ -92,6 +95,14 @@ extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); +extern LogicalDecodingContext *CreateCopyDecodingContext( + List *output_plugin_options, + LogicalOutputPluginWriterPrepareWrite prepare_write, + LogicalOutputPluginWriterWrite do_write); +extern void DecodingContextProccessTuple(LogicalDecodingContext *ctx, + Relation rel, HeapTuple tup); +extern List *DecodingContextGetTableList(LogicalDecodingContext *ctx); + extern void LogicalIncreaseXminForSlot(XLogRecPtr lsn, TransactionId xmin); extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 715ac7f..72189ce 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -13,6 +13,7 @@ #define LOGICALLAUNCHER_H extern int max_logical_replication_workers; +extern int max_sync_workers_per_subscription; extern void ApplyLauncherRegister(void); extern void ApplyLauncherMain(Datum main_arg); diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 5e824ae..091a9f9 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -59,6 +59,7 @@ extern void FreeSnapshotBuilder(SnapBuild *cache); extern void SnapBuildSnapDecRefcount(Snapshot snap); +extern Snapshot SnapBuildInitalSnapshot(SnapBuild *builder); extern const char *SnapBuildExportSnapshot(SnapBuild *snapstate); extern void SnapBuildClearExportedSnapshot(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index fe35404..d7ab12f 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -15,6 +15,7 @@ #include "access/xlog.h" #include "access/xlogdefs.h" #include "fmgr.h" +#include "replication/logicalproto.h" #include "storage/latch.h" #include "storage/spin.h" #include "pgtime.h" @@ -186,6 +187,15 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, XLogRecPtr *lsn); typedef bool (*walrcv_command_fn) (WalReceiverConn *conn, const char *cmd, char **err); +typedef List *(*walrcv_table_list_fn) (WalReceiverConn *conn, + List *publications); +typedef void (*walrcv_table_info_fn) (WalReceiverConn *conn, + const char *nspname, + const char *relname, + LogicalRepRelation *lrel); +typedef void (*walrcv_table_copy_fn) (WalReceiverConn *conn, + const char *nspname, + const char *relname); typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn); typedef struct WalReceiverFunctionsType @@ -201,6 +211,9 @@ typedef struct WalReceiverFunctionsType walrcv_send_fn walrcv_send; walrcv_create_slot_fn walrcv_create_slot; walrcv_command_fn walrcv_command; + walrcv_table_list_fn walrcv_table_list; + walrcv_table_info_fn walrcv_table_info; + walrcv_table_copy_fn walrcv_table_copy; walrcv_disconnect_fn walrcv_disconnect; } WalReceiverFunctionsType; @@ -228,6 +241,12 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, lsn) #define walrcv_command(conn, cmd, err) \ WalReceiverFunctions->walrcv_command(conn, cmd, err) +#define walrcv_table_list(conn, publications) \ + WalReceiverFunctions->walrcv_table_list(conn, publications) +#define walrcv_table_info(conn, nspname, relname, lrel) \ + WalReceiverFunctions->walrcv_table_info(conn, nspname, relname, lrel) +#define walrcv_table_copy(conn, nspname, relname) \ + WalReceiverFunctions->walrcv_table_copy(conn, nspname, relname) #define walrcv_disconnect(conn) \ WalReceiverFunctions->walrcv_disconnect(conn) diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index f1b1993..bb94834 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -28,7 +28,7 @@ extern int wal_sender_timeout; extern bool log_replication_commands; extern void InitWalSender(void); -extern void exec_replication_command(const char *query_string); +extern bool exec_replication_command(const char *query_string); extern void WalSndErrorCleanup(void); extern void WalSndSignals(void); extern Size WalSndShmemSize(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index cecd2b8..403cd8a 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -40,6 +40,9 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; +/* Memory context for cached variables in apply worker. */ +MemoryContext ApplyCacheContext; + /* libpqreceiver connection */ extern struct WalReceiverConn *wrconn; @@ -51,12 +54,19 @@ extern bool in_remote_transaction; extern bool got_SIGTERM; extern void logicalrep_worker_attach(int slot); -extern LogicalRepWorker *logicalrep_worker_find(Oid subid); -extern int logicalrep_worker_count(Oid subid); -extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid); -extern void logicalrep_worker_stop(Oid subid); -extern void logicalrep_worker_wakeup(Oid subid); +extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, + bool only_running); +extern void logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, + Oid userid, Oid relid); +extern void logicalrep_worker_stop(Oid subid, Oid relid); +extern void logicalrep_worker_wakeup(Oid subid, Oid relid); + +extern int logicalrep_sync_worker_count(Oid subid); extern void logicalrep_worker_sigterm(SIGNAL_ARGS); +extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos); +void process_syncing_tables(char *slotname, XLogRecPtr end_lsn); +void invalidate_syncing_table_states(Datum arg, int cacheid, + uint32 hashvalue); #endif /* WORKER_INTERNAL_H */ diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index e1bb344..e581163 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -522,6 +522,7 @@ extern Datum pg_collation_for(PG_FUNCTION_ARGS); extern Datum pg_relation_is_updatable(PG_FUNCTION_ARGS); extern Datum pg_column_is_updatable(PG_FUNCTION_ARGS); extern Datum parse_ident(PG_FUNCTION_ARGS); +extern Datum pg_get_replica_identity_index(PG_FUNCTION_ARGS); /* oid.c */ extern Datum oidin(PG_FUNCTION_ARGS); diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index 66f60d2..e3c7ef0 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -89,6 +89,8 @@ enum SysCacheIdentifier STATRELATTINH, SUBSCRIPTIONOID, SUBSCRIPTIONNAME, + SUBSCRIPTIONRELOID, + SUBSCRIPTIONRELMAP, TABLESPACEOID, TRFOID, TRFTYPELANG, diff --git a/src/test/README b/src/test/README index 62395e7..74bab09 100644 --- a/src/test/README +++ b/src/test/README @@ -37,5 +37,8 @@ regress/ ssl/ Tests to exercise and verify SSL certificate handling +subscription/ + Test suite for subscriptions and logical replication + thread/ A thread-safety-testing utility used by configure diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index ccf1453..286899e 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1833,13 +1833,14 @@ pg_stat_ssl| SELECT s.pid, pg_stat_subscription| SELECT su.oid AS subid, su.subname, st.pid, + st.relid, st.received_lsn, st.last_msg_send_time, st.last_msg_receipt_time, st.latest_end_lsn, st.latest_end_time FROM (pg_subscription su - LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); + LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 0af013f..ff3ef40 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -136,6 +136,7 @@ pg_shdescription|t pg_shseclabel|t pg_statistic|t pg_subscription|t +pg_subscription_rel|t pg_tablespace|t pg_transform|t pg_trigger|t diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index cb1ab4e..35ca05c 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -13,12 +13,11 @@ CREATE SUBSCRIPTION testsub PUBLICATION foo; ERROR: syntax error at or near "PUBLICATION" LINE 1: CREATE SUBSCRIPTION testsub PUBLICATION foo; ^ -set client_min_messages to error; CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub; ERROR: invalid connection string syntax: missing "=" after "testconn" in connection info string -CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT); -reset client_min_messages; +CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (SKIP CONNECT); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ List of subscriptions Name | Owner | Enabled | Publication | Conninfo @@ -26,47 +25,38 @@ reset client_min_messages; testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist (1 row) -ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3; -\dRs - List of subscriptions - Name | Owner | Enabled | Publication ----------+---------------------------+---------+--------------------- - testsub | regress_subscription_user | f | {testpub2,testpub3} -(1 row) - ALTER SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist2'; -ALTER SUBSCRIPTION testsub SET PUBLICATION testpub, testpub1; \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Conninfo ----------+---------------------------+---------+--------------------+---------------------- - testsub | regress_subscription_user | f | {testpub,testpub1} | dbname=doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Conninfo +---------+---------------------------+---------+-------------+---------------------- + testsub | regress_subscription_user | f | {testpub} | dbname=doesnotexist2 (1 row) BEGIN; ALTER SUBSCRIPTION testsub ENABLE; \dRs - List of subscriptions - Name | Owner | Enabled | Publication ----------+---------------------------+---------+-------------------- - testsub | regress_subscription_user | t | {testpub,testpub1} + List of subscriptions + Name | Owner | Enabled | Publication +---------+---------------------------+---------+------------- + testsub | regress_subscription_user | t | {testpub} (1 row) ALTER SUBSCRIPTION testsub DISABLE; \dRs - List of subscriptions - Name | Owner | Enabled | Publication ----------+---------------------------+---------+-------------------- - testsub | regress_subscription_user | f | {testpub,testpub1} + List of subscriptions + Name | Owner | Enabled | Publication +---------+---------------------------+---------+------------- + testsub | regress_subscription_user | f | {testpub} (1 row) COMMIT; ALTER SUBSCRIPTION testsub RENAME TO testsub_foo; \dRs - List of subscriptions - Name | Owner | Enabled | Publication --------------+---------------------------+---------+-------------------- - testsub_foo | regress_subscription_user | f | {testpub,testpub1} + List of subscriptions + Name | Owner | Enabled | Publication +-------------+---------------------------+---------+------------- + testsub_foo | regress_subscription_user | f | {testpub} (1 row) DROP SUBSCRIPTION testsub_foo NODROP SLOT; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index fce6069..3c8989b 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -11,19 +11,12 @@ CREATE SUBSCRIPTION testsub CONNECTION 'foo'; -- fail - no connection CREATE SUBSCRIPTION testsub PUBLICATION foo; -set client_min_messages to error; CREATE SUBSCRIPTION testsub CONNECTION 'testconn' PUBLICATION testpub; -CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (DISABLED, NOCREATE SLOT); -reset client_min_messages; +CREATE SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist' PUBLICATION testpub WITH (SKIP CONNECT); \dRs+ -ALTER SUBSCRIPTION testsub SET PUBLICATION testpub2, testpub3; - -\dRs - ALTER SUBSCRIPTION testsub CONNECTION 'dbname=doesnotexist2'; -ALTER SUBSCRIPTION testsub SET PUBLICATION testpub, testpub1; \dRs+ diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index a9c4b01..143b005 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 11; +use Test::More tests => 13; # Initialize publisher node my $node_publisher = get_new_node('publisher'); @@ -19,7 +19,7 @@ $node_subscriber->start; $node_publisher->safe_psql('postgres', "CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a"); $node_publisher->safe_psql('postgres', - "CREATE TABLE tab_ins (a int)"); + "CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a"); $node_publisher->safe_psql('postgres', "CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a"); $node_publisher->safe_psql('postgres', @@ -56,10 +56,20 @@ my $caughtup_query = $node_publisher->poll_query_until('postgres', $caughtup_query) or die "Timed out while waiting for subscriber to catch up"; +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep"); is($result, qq(0), 'check non-replicated table is empty on subscriber'); +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); +is($result, qq(1002), 'check initial data was copied to subscriber'); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_ins SELECT generate_series(1,50)"); $node_publisher->safe_psql('postgres', @@ -79,7 +89,7 @@ $node_publisher->poll_query_until('postgres', $caughtup_query) $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); -is($result, qq(50|1|50), 'check replicated inserts on subscriber'); +is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep"); @@ -109,7 +119,7 @@ $node_publisher->poll_query_until('postgres', $caughtup_query) $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full"); -is($result, qq(10|1|100), 'update works with REPLICA IDENTITY FULL and duplicate tuples'); +is($result, qq(20|1|100), 'update works with REPLICA IDENTITY FULL and duplicate tuples'); # check that change of connection string and/or publication list causes # restart of subscription workers. Not all of these are registered as tests @@ -126,7 +136,7 @@ $node_publisher->poll_query_until('postgres', $oldpid = $node_publisher->safe_psql('postgres', "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname';"); $node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only"); + "ALTER SUBSCRIPTION tap_sub SET PUBLICATION tap_pub_ins_only WITH (NOCOPY DATA)"); $node_publisher->poll_query_until('postgres', "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname';") or die "Timed out while waiting for apply to restart"; @@ -141,7 +151,7 @@ $node_publisher->poll_query_until('postgres', $caughtup_query) $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); -is($result, qq(150|1|1100), 'check replicated inserts after subscription publication change'); +is($result, qq(1152|1|1100), 'check replicated inserts after subscription publication change'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_rep"); @@ -154,6 +164,8 @@ $node_publisher->safe_psql('postgres', "ALTER PUBLICATION tap_pub_ins_only ADD TABLE tab_full"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 0"); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION WITH (NOCOPY DATA)"); $node_publisher->safe_psql('postgres', "INSERT INTO tab_full VALUES(0)"); @@ -163,11 +175,11 @@ $node_publisher->poll_query_until('postgres', $caughtup_query) # note that data are different on provider and subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); -is($result, qq(50|1|50), 'check replicated deletes after alter publication'); +is($result, qq(1052|1|1002), 'check replicated deletes after alter publication'); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_full"); -is($result, qq(11|0|100), 'check replicated insert after alter publication'); +is($result, qq(21|0|100), 'check replicated insert after alter publication'); # check restart on rename $oldpid = $node_publisher->safe_psql('postgres', @@ -190,8 +202,12 @@ $result = is($result, qq(0), 'check replication slot was dropped on publisher'); $result = - $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_replication_origin"); -is($result, qq(0), 'check replication origin was dropped on subscriber'); + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription_rel"); +is($result, qq(0), 'check subscription relation status was dropped on subscriber'); + +$result = + $node_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots"); +is($result, qq(0), 'check replication slot was dropped on publisher'); $node_subscriber->stop('fast'); $node_publisher->stop('fast'); diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl index 9064eb4..6ac29f7 100644 --- a/src/test/subscription/t/002_types.pl +++ b/src/test/subscription/t/002_types.pl @@ -111,6 +111,12 @@ my $caughtup_query = $node_publisher->poll_query_until('postgres', $caughtup_query) or die "Timed out while waiting for subscriber to catch up"; +# Wait for initial sync to finish as well +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('s', 'r');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + # Insert initial test data $node_publisher->safe_psql('postgres', qq( -- test_tbl_one_array_col diff --git a/src/test/subscription/t/003_sync.pl b/src/test/subscription/t/003_sync.pl new file mode 100644 index 0000000..756e69d --- /dev/null +++ b/src/test/subscription/t/003_sync.pl @@ -0,0 +1,159 @@ +# Basic logical replication test +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +# Initialize publisher node +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = get_new_node('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(1,10)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep (a int primary key)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR ALL TABLES"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"); + +# Wait for subscriber to finish initialization +my $caughtup_query = +"SELECT pg_current_xlog_location() <= replay_location FROM pg_stat_replication WHERE application_name = '$appname';"; +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +# Also wait for initial table sync to finish +my $synced_query = +"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(10), 'initial data synced for first sub'); + +# drop subscription so that there is unreplicated data +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep SELECT generate_series(11,20)"); + +# recreate the subscription, it will try to do initial copy +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"); + +# but it will be stuck on data copy as it will fail on constraint +my $started_query = +"SELECT srsubstate = 'd' FROM pg_subscription_rel;"; +$node_subscriber->poll_query_until('postgres', $started_query) + or die "Timed out while waiting for subscriber to start sync"; + +# remove the conflicting data +$node_subscriber->safe_psql('postgres', + "DELETE FROM tab_rep;"); + +# wait for sync to finish this time +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that all data is synced +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(20), 'initial data synced for second sub'); + +# now check another subscription for the same node pair +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"); + +# wait for it to start +$node_subscriber->poll_query_until('postgres', "SELECT pid IS NOT NULL FROM pg_stat_subscription WHERE subname = 'tap_sub2' AND relid IS NULL") + or die "Timed out while waiting for subscriber to start"; + +# and drop both subscriptions +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2"); + +# check subscriptions are removed +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); +is($result, qq(0), 'second and third sub are dropped'); + +# remove the conflicting data +$node_subscriber->safe_psql('postgres', + "DELETE FROM tab_rep;"); + +# recreate the subscription again +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub"); + +# and wait for data sync to finish again +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# check that all data is synced +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep"); +is($result, qq(20), 'initial data synced for fourth sub'); + +# add new table on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_rep_next (a int)"); + +# setup structure with existing data on pubisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_rep_next (a) AS SELECT generate_series(1,10)"); + +# Wait for subscription to catch up +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep_next"); +is($result, qq(0), 'no data for table added after subscription initialized'); + +# ask for data sync +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub REFRESH PUBLICATION"); + +# wait for sync to finish +$node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep_next"); +is($result, qq(10), 'data for table added after subscription initialized are now synced'); + +# Add some data +$node_publisher->safe_psql('postgres', + "INSERT INTO tab_rep_next SELECT generate_series(1,10)"); + +# Wait for subscription to catch up +$node_publisher->poll_query_until('postgres', $caughtup_query) + or die "Timed out while waiting for subscriber to catch up"; + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep_next"); +is($result, qq(20), 'changes for table added after subscription initialized replicated'); + +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); -- 2.7.4