From: | Peter Smith <smithpb2250(at)gmail(dot)com> |
---|---|
To: | "Zhijie Hou (Fujitsu)" <houzj(dot)fnst(at)fujitsu(dot)com> |
Cc: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, shveta malik <shveta(dot)malik(at)gmail(dot)com>, "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, Nisha Moond <nisha(dot)moond412(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org> |
Subject: | Re: Synchronizing slots from primary to standby |
Date: | 2023-12-11 07:32:24 |
Message-ID: | CAHut+Psf9z132WNgy0Gr10ZTnonpNjvTBj74wG8kSxXU4rOD7g@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Here are some review comments for v44-0001
======
src/backend/replication/slot.c
1. ReplicationSlotCreate
* during getting changes, if the two_phase option is enabled it can skip
* prepare because by that time start decoding point has been moved. So the
* user will only get commit prepared.
+ * failover: Allows the slot to be synced to physical standbys so that logical
+ * replication can be resumed after failover.
*/
void
ReplicationSlotCreate(const char *name, bool db_specific,
~
/Allows the slot.../If enabled, allows the slot.../
======
2. validate_standby_slots
+validate_standby_slots(char **newval)
+{
+ char *rawname;
+ List *elemlist;
+ ListCell *lc;
+ bool ok = true;
+
+ /* Need a modifiable copy of string */
+ rawname = pstrdup(*newval);
+
+ /* Verify syntax and parse string into list of identifiers */
+ if (!(ok = SplitIdentifierString(rawname, ',', &elemlist)))
+ GUC_check_errdetail("List syntax is invalid.");
+
+ /*
+ * If there is a syntax error in the name or if the replication slots'
+ * data is not initialized yet (i.e., we are in the startup process), skip
+ * the slot verification.
+ */
+ if (!ok || !ReplicationSlotCtl)
+ {
+ pfree(rawname);
+ list_free(elemlist);
+ return ok;
+ }
2a.
You don't need to initialize 'ok' during declaration because it is
assigned immediately anyway.
~
2b.
AFAIK assignment within a conditional like this is not a normal PG
coding style unless there is no other way to do it.
~
2c.
/into list/into a list/
SUGGESTION
/* Verify syntax and parse string into a list of identifiers */
ok = SplitIdentifierString(rawname, ',', &elemlist);
if (!ok)
GUC_check_errdetail("List syntax is invalid.");
~~~
3. assign_standby_slot_names
+ if (!SplitIdentifierString(standby_slot_names_cpy, ',', &standby_slots))
+ {
+ /* This should not happen if GUC checked check_standby_slot_names. */
+ elog(ERROR, "list syntax is invalid");
+ }
This error here and in validate_standby_slots() are different --
"list" versus "List".
======
src/backend/replication/walsender.c
4. WalSndFilterStandbySlots
+ foreach(lc, standby_slots_cpy)
+ {
+ char *name = lfirst(lc);
+ XLogRecPtr restart_lsn = InvalidXLogRecPtr;
+ bool invalidated = false;
+ char *warningfmt = NULL;
+ ReplicationSlot *slot;
+
+ slot = SearchNamedReplicationSlot(name, true);
+
+ if (slot && SlotIsPhysical(slot))
+ {
+ SpinLockAcquire(&slot->mutex);
+ restart_lsn = slot->data.restart_lsn;
+ invalidated = slot->data.invalidated != RS_INVAL_NONE;
+ SpinLockRelease(&slot->mutex);
+ }
+
+ /* Continue if the current slot hasn't caught up. */
+ if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) &&
+ restart_lsn < wait_for_lsn)
+ {
+ /* Log warning if no active_pid for this physical slot */
+ if (slot->active_pid == 0)
+ ereport(WARNING,
+ errmsg("replication slot \"%s\" specified in parameter \"%s\" does
not have active_pid",
+ name, "standby_slot_names"),
+ errdetail("Logical replication is waiting on the "
+ "standby associated with \"%s\"", name),
+ errhint("Consider starting standby associated with "
+ "\"%s\" or amend standby_slot_names", name));
+
+ continue;
+ }
+ else if (!slot)
+ {
+ /*
+ * It may happen that the slot specified in standby_slot_names GUC
+ * value is dropped, so let's skip over it.
+ */
+ warningfmt = _("replication slot \"%s\" specified in parameter
\"%s\" does not exist, ignoring");
+ }
+ else if (SlotIsLogical(slot))
+ {
+ /*
+ * If a logical slot name is provided in standby_slot_names, issue
+ * a WARNING and skip it. Although logical slots are disallowed in
+ * the GUC check_hook(validate_standby_slots), it is still
+ * possible for a user to drop an existing physical slot and
+ * recreate a logical slot with the same name. Since it is
+ * harmless, a WARNING should be enough, no need to error-out.
+ */
+ warningfmt = _("cannot have logical replication slot \"%s\" in
parameter \"%s\", ignoring");
+ }
+ else if (XLogRecPtrIsInvalid(restart_lsn) || invalidated)
+ {
+ /*
+ * Specified physical slot may have been invalidated, so there is no point
+ * in waiting for it.
+ */
+ warningfmt = _("physical slot \"%s\" specified in parameter \"%s\"
has been invalidated, ignoring");
+ }
+ else
+ {
+ Assert(restart_lsn >= wait_for_lsn);
+ }
This if/else chain seems structured awkwardly. IMO it would be tidier
to eliminate the NULL slot and IsLogicalSlot up-front, which would
also simplify some of the subsequent conditions
SUGGESTION
slot = SearchNamedReplicationSlot(name, true);
if (!slot)
{
...
}
else if (SlotIsLogical(slot))
{
...
}
else
{
Assert(SlotIsPhysical(slot))
SpinLockAcquire(&slot->mutex);
restart_lsn = slot->data.restart_lsn;
invalidated = slot->data.invalidated != RS_INVAL_NONE;
SpinLockRelease(&slot->mutex);
if (XLogRecPtrIsInvalid(restart_lsn) || invalidated)
{
...
}
else if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) &&
restart_lsn < wait_for_lsn)
{
...
}
else
{
Assert(restart_lsn >= wait_for_lsn);
}
}
~~~~
5. WalSndWaitForWal
+ else
+ {
+ /* already caught up and doesn't need to wait for standby_slots */
break;
+ }
/Already/already/
======
src/test/recovery/t/050_standby_failover_slots_sync.pl
6.
+$subscriber1->safe_psql('postgres',
+ "CREATE TABLE tab_int (a int PRIMARY KEY);");
+
+# Create a subscription with failover = true
+$subscriber1->safe_psql('postgres',
+ "CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' "
+ . "PUBLICATION regress_mypub WITH (slot_name = lsub1_slot,
failover = true);"
+);
Consider combining these DDL statements.
~~~
7.
+$subscriber2->safe_psql('postgres',
+ "CREATE TABLE tab_int (a int PRIMARY KEY);");
+$subscriber2->safe_psql('postgres',
+ "CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' "
+ . "PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);");
Consider combining these DDL statements
~~~
8.
+# Stop the standby associated with specified physical replication slot so that
+# the logical replication slot won't receive changes until the standby slot's
+# restart_lsn is advanced or the slots is removed from the standby_slot_names
+# list
+$publisher->safe_psql('postgres', "TRUNCATE tab_int;");
+$publisher->wait_for_catchup('regress_mysub1');
+$standby1->stop;
/with specified/with the specified/
/or the slots is/or the slot is/
~~~
9.
+# Create some data on primary
/on primary/on the primary/
~~~
10.
+$result =
+ $subscriber1->safe_psql('postgres', "SELECT count(*) = 10 FROM tab_int;");
+is($result, 't',
+ "subscriber1 doesn't get data as the sb1_slot doesn't catch up");
I felt instead of checking for 10 maybe it's more consistent with the
previous code to assign again that $primary_row_count variable to 20;
Then check that those primary rows are not all yet received like:
SELECT count(*) < $primary_row_count FROM tab_int;
~~~
11.
+# Now that the standby lsn has advanced, primary must send the decoded
+# changes to the subscription.
+$publisher->wait_for_catchup('regress_mysub1');
+$result =
+ $subscriber1->safe_psql('postgres', "SELECT count(*) = 20 FROM tab_int;");
+is($result, 't',
+ "subscriber1 gets data from primary after standby1 is removed from
the standby_slot_names list"
+);
/primary must/the primary must/
(continuing the suggestion from the previous review comment)
Now this SQL can use the variable too:
subscriber1->safe_psql('postgres', "SELECT count(*) =
$primary_row_count FROM tab_int;");
~~~
12.
+
+# Create another subscription enabling failover
+$subscriber1->safe_psql('postgres',
+ "CREATE SUBSCRIPTION regress_mysub3 CONNECTION '$publisher_connstr' "
+ . "PUBLICATION regress_mypub WITH (slot_name = lsub3_slot,
copy_data=false, failover = true, create_slot = false);"
+);
Maybe give some more information in that comment:
SUGGESTION
Create another subscription (using the same slot created above) that
enables failover.
======
Kind Regards,
Peter Smith.
Fujitsu Australia
From | Date | Subject | |
---|---|---|---|
Next Message | Drouvot, Bertrand | 2023-12-11 07:52:28 | Re: Synchronizing slots from primary to standby |
Previous Message | Masahiko Sawada | 2023-12-11 06:17:37 | Re: [PoC] Improve dead tuple storage for lazy vacuum |