diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 3ac4a4b..cba6661 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2050,21 +2050,6 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" - TWO_PHASE [ boolean ] - - - If true, this logical replication slot supports decoding of two-phase - commit. With this option, commands related to two-phase commit such as - PREPARE TRANSACTION, COMMIT PREPARED - and ROLLBACK PREPARED are decoded and transmitted. - The transaction will be decoded and transmitted at - PREPARE TRANSACTION time. - The default is false. - - - - - RESERVE_WAL [ boolean ] @@ -2104,6 +2089,21 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + TWO_PHASE [ boolean ] + + + If true, this logical replication slot supports decoding of two-phase + commit. With this option, commands related to two-phase commit such as + PREPARE TRANSACTION, COMMIT PREPARED + and ROLLBACK PREPARED are decoded and transmitted. + The transaction will be decoded and transmitted at + PREPARE TRANSACTION time. + The default is false. + + + diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 35bce68..f3c6e1f 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2741,5 +2741,6 @@ LookupGXactBySubid(Oid subid) } } LWLockRelease(TwoPhaseStateLock); + return found; } diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 6995a62..3703cf6 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1076,6 +1076,8 @@ CheckAlterSubOption(Subscription *sub, const char *option, bool isTopLevel) { StringInfoData cmd; + Assert(strstr("two_phase,failover", option)); + if (!sub->slotname) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -1098,8 +1100,8 @@ CheckAlterSubOption(Subscription *sub, const char *option, bool isTopLevel) appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option); /* - * The changed option of the slot can't be rolled back: prevent we are in - * the transaction state. + * The changed option of the slot can't be rolled back, so disallow if we + * are in a transaction block. */ PreventInTransactionBlock(isTopLevel, cmd.data); @@ -1282,7 +1284,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, /* * Workers may still survive even if the subscription has * been disabled. They may read the pg_subscription - * catalog and detect that the twophase parameter is + * catalog and detect that the two_phase parameter is * updated, which causes the assertion failure. Ensure * workers have already been exited to avoid it. */ @@ -1304,7 +1306,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("cannot disable two_phase when prepared transactions are present"), errhint("Resolve these transactions and try again."))); - /* Change system catalog acoordingly */ + /* Change system catalog accordingly */ values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 45744b7..c566d50 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -272,15 +272,15 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) * the subscription, instead of just one. */ List * -logicalrep_workers_find(Oid subid, bool only_running, bool require_lock) +logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock) { int i; List *res = NIL; - if (require_lock) + if (acquire_lock) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - else - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); /* Search for attached worker for a given subscription id. */ for (i = 0; i < max_logical_replication_workers; i++) @@ -291,7 +291,7 @@ logicalrep_workers_find(Oid subid, bool only_running, bool require_lock) res = lappend(res, w); } - if (require_lock) + if (acquire_lock) LWLockRelease(LogicalRepWorkerLock); return res; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2f167a2..e75f24b 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -842,23 +842,25 @@ ReplicationSlotAlter(const char *name, bool *failover, bool *two_phase) " on the standby")); } - /* - * Do not allow users to enable failover for temporary slots as we do not - * support syncing temporary slots to the standby. - */ - if (failover && *failover && - MyReplicationSlot->data.persistency == RS_TEMPORARY) + if (failover) + { + /* + * Do not allow users to enable failover for temporary slots as we do not + * support syncing temporary slots to the standby. + */ + if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot enable failover for a temporary replication slot")); - if (failover && MyReplicationSlot->data.failover != *failover) - { - SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.failover = *failover; - SpinLockRelease(&MyReplicationSlot->mutex); + if (MyReplicationSlot->data.failover != *failover) + { + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.failover = *failover; + SpinLockRelease(&MyReplicationSlot->mutex); - update_slot = true; + update_slot = true; + } } if (two_phase && MyReplicationSlot->data.two_phase != *two_phase) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 04f65e0..af8e958 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1405,56 +1405,42 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) ReplicationSlotDrop(cmd->slotname, !cmd->wait); } + /* - * Process extra options given to ALTER_REPLICATION_SLOT. + * Change the definition of a replication slot. */ static void -ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, - bool *failover_given, bool *failover, - bool *two_phase_given, bool *two_phase) +AlterReplicationSlot(AlterReplicationSlotCmd *cmd) { - *failover_given = false; - *two_phase_given = false; + bool failover_given = false; + bool two_phase_given = false; + bool failover; + bool two_phase; /* Parse options */ foreach_ptr(DefElem, defel, cmd->options) { if (strcmp(defel->defname, "failover") == 0) { - if (*failover_given) + if (failover_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *failover_given = true; - *failover = defGetBoolean(defel); + failover_given = true; + failover = defGetBoolean(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { - if (*two_phase_given) + if (two_phase_given) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); - *two_phase_given = true; - *two_phase = defGetBoolean(defel); + two_phase_given = true; + two_phase = defGetBoolean(defel); } else elog(ERROR, "unrecognized option: %s", defel->defname); } -} - -/* - * Change the definition of a replication slot. - */ -static void -AlterReplicationSlot(AlterReplicationSlotCmd *cmd) -{ - bool failover_given; - bool two_phase_given; - bool failover; - bool two_phase; - - ParseAlterReplSlotOptions(cmd, &failover_given, &failover, - &two_phase_given, &two_phase); ReplicationSlotAlter(cmd->slotname, failover_given ? &failover : NULL, diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 990f524..9646261 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -241,7 +241,7 @@ extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running, - bool require_lock); + bool acquire_lock); extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 51fa4b9..40e1a07 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -377,7 +377,7 @@ HINT: To initiate replication, you must manually create the replication slot, e regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) --- We can alter streaming when two_phase enabled +-- we can alter streaming when two_phase is enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ List of subscriptions diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index a3886d7..b64f419 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -256,7 +256,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); \dRs+ --- We can alter streaming when two_phase enabled +-- we can alter streaming when two_phase is enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index 4e8f627..66265c7 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -371,8 +371,8 @@ is($result, qq(2), 'replicated data in subscriber table'); $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); ############################### -# Disable the subscription and alter it to two_phase = false, -# then verify that the altered subscription reflects the two_phase option. +# Alter the subscription to two_phase = false. +# Verify that the altered subscription reflects the two_phase option. ############################### # Alter subscription two_phase to false @@ -395,7 +395,10 @@ $result = $node_subscriber->safe_psql('postgres', ); is($result, qq(d), 'two-phase should be disabled'); -# Now do a prepare on the publisher and make sure that it is not replicated. +############################### +# Now do a prepare on the publisher. +# Verify that it is not replicated. +############################### $node_publisher->safe_psql( 'postgres', qq{ BEGIN; @@ -411,7 +414,10 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'should be no prepared transactions on subscriber'); -# Now commit the insert and verify that it is replicated +############################### +# Now commit the insert. +# Verify that it is replicated. +############################### $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';"); # Wait for the subscriber to catchup @@ -422,7 +428,10 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); is($result, qq(3), 'replicated data in subscriber table'); -# Alter subscription two_phase to true +############################### +# Alter the subscription to two_phase = true. +# Verify that the altered subscription reflects the two_phase option. +############################### $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_copy DISABLE;"); $node_subscriber->poll_query_until('postgres',