From: | Peter Smith <smithpb2250(at)gmail(dot)com> |
---|---|
To: | shveta malik <shveta(dot)malik(at)gmail(dot)com> |
Cc: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org> |
Subject: | Re: Synchronizing slots from primary to standby |
Date: | 2023-09-27 04:36:43 |
Message-ID: | CAHut+Ptprmg3stw7yPAGbGPmGr15sMEWCSzpUoHg85uTY19bDQ@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Here are some more review comments for the patch v19-0002.
This is a WIP.... these review comments are all for the file slotsync.c
======
src/backend/replication/logical/slotsync.c
1. wait_for_primary_slot_catchup
+ WalRcvExecResult *res;
+ TupleTableSlot *slot;
+ Oid slotRow[1] = {LSNOID};
+ StringInfoData cmd;
+ bool isnull;
+ XLogRecPtr restart_lsn;
+
+ for (;;)
+ {
+ int rc;
I could not recognize a reason why 'rc' is declared within the loop,
but none of the other local variables are. Personally, I'd declare all
variables at the deepest scope (e.g. inside the for loop).
~~~
2. get_local_synced_slot_names
+/*
+ * Get list of local logical slot names which are synchronized from
+ * primary and belongs to one of the DBs passed in.
+ */
+static List *
+get_local_synced_slot_names(Oid *dbids)
+{
IIUC, this function gets called only from the drop_obsolete_slots()
function. But I thought this list of local slot names (i.e. for the
dbids that this worker is handling) would be something that perhaps
could the initialized one time for the worker, instead of it being
re-calculated every single time the slots processing/dropping happens.
Isn't the current code expending too much effort recalculating over
and over but giving back the same list every time?
~~~
3. get_local_synced_slot_names
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ /* Check if it is logical synchronized slot */
+ if (s->in_use && SlotIsLogical(s) && s->data.synced)
+ {
+ for (int j = 0; j < MySlotSyncWorker->dbcount; j++)
+ {
Loop variables are not declared in the common PG code way.
~~~
4. slot_exists_locally
+static bool
+slot_exists_locally(List *remote_slots, ReplicationSlot *local_slot,
+ bool *locally_invalidated)
+{
+ ListCell *cell;
+
+ foreach(cell, remote_slots)
+ {
+ RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell);
+
+ if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
+ {
+ /*
+ * if remote slot is marked as non-conflicting (i.e. not
+ * invalidated) but local slot is marked as invalidated, then set
+ * the bool.
+ */
+ if (!remote_slot->conflicting &&
+ SlotIsLogical(local_slot) &&
+ local_slot->data.invalidated != RS_INVAL_NONE)
+ *locally_invalidated = true;
+
+ return true;
+ }
+ }
+
+ return false;
+}
Why is there a SlotIsLogical(local_slot) check buried in this
function? How is slot_exists_locally() getting called with a
non-logical local_slot? Shouldn't that have been screened out long
before here?
~~~
5. use_slot_in_query
+static bool
+use_slot_in_query(char *slot_name, Oid *dbids)
There are multiple non-standard for-loop variable declarations in this function.
~~~
6. compute_naptime
+ * The first slot managed by each worker is chosen for monitoring purpose.
+ * If the lsn of that slot changes during each sync-check time, then the
+ * nap time is kept at regular value of WORKER_DEFAULT_NAPTIME_MS.
+ * When no lsn change is observed for WORKER_INACTIVITY_THRESHOLD_MS
+ * time, then the nap time is increased to WORKER_INACTIVITY_NAPTIME_MS.
+ * This nap time is brought back to WORKER_DEFAULT_NAPTIME_MS as soon as
+ * lsn change is observed.
6a.
/regular value/the regular value/
/for WORKER_INACTIVITY_THRESHOLD_MS time/within the threshold period
(WORKER_INACTIVITY_THRESHOLD_MS)/
~
6b.
/as soon as lsn change is observed./as soon as another lsn change is observed./
~~~
7.
+ * The caller is supposed to ignore return-value of 0. The 0 value is returned
+ * for the slots other that slot being monitored.
+ */
+static long
+compute_naptime(RemoteSlot *remote_slot)
This rule about the returning 0 seemed hacky to me. IMO this would be
a better API to pass long *naptime (which this function either updates
or doesn't update, depending on this being the "monitored" slot.
Knowing the current naptime is also useful to improve the function
logic (see the next review comment below).
Also, since this function is really only toggling naptime between 2
values, it would be helpful to assert that
Assert(*naptime == WORKER_DEFAULT_NAPTIME_MS || *naptime ==
WORKER_INACTIVITY_NAPTIME_MS);
~~~
8.
+ if (NameStr(MySlotSyncWorker->monitoring_info.slot_name)[0] == '\0')
+ {
+ /*
+ * First time, just update the name and lsn and return regular
+ * nap time. Start comparison from next time onward.
+ */
+ strcpy(NameStr(MySlotSyncWorker->monitoring_info.slot_name),
+ remote_slot->name);
I wasn't sure why it was necessary to identify the "monitoring" slot
by name. Why doesn't the compute_naptime just get called only for the
1st slot found in the tuple loop instead of all the strcmp business
trying to match monitor names?
And, if the monitored slot gets "dropped", then so what; next time
another slot will be the first tuple so will automatically take its
place, right?
~~~
9.
+ /*
+ * If new received lsn (remote one) is different from what we have in
+ * our local slot, then update last_update_time.
+ */
+ if (MySlotSyncWorker->monitoring_info.confirmed_lsn !=
+ remote_slot->confirmed_lsn)
+ MySlotSyncWorker->monitoring_info.last_update_time = now;
+
+ MySlotSyncWorker->monitoring_info.confirmed_lsn =
+ remote_slot->confirmed_lsn;
Doesn't it make more sense to also put that 'confirmed_lsn' assignment
under the same condition? e.g. No need to overwrite the same value
again.
~~~
10.
+ /* If the inactivity time reaches the threshold, increase nap time */
+ if (TimestampDifferenceExceeds(MySlotSyncWorker->monitoring_info.last_update_time,
+ now, WORKER_INACTIVITY_THRESHOLD_MS))
+ return WORKER_INACTIVITY_NAPTIME_MS;
+ else
+ return WORKER_DEFAULT_NAPTIME_MS;
+ }
Somehow this feels overcomplicated to me.
In reality, the naptime is only toggling between 2 values (DEFAULT and
INACTIVITY) so we should never need to be testing
TimestampDifferenceExceeds again and again on subsequent calls (there
might be 1000s of them)
Once naptime is WORKER_INACTIVITY_NAPTIME_MS we know to reset it back
to WORKER_DEFAULT_NAPTIME_MS only if
(MySlotSyncWorker->monitoring_info.confirmed_lsn !=
remote_slot->confirmed_lsn) is detected.
Basically, I think the algorithm should be like the code below:
TimestampTz now = GetCurrentTimestamp();
if (MySlotSyncWorker->monitoring_info.confirmed_lsn !=
remote_slot->confirmed_lsn)
{
MySlotSyncWorker->monitoring_info.last_update_time = now;
MySlotSyncWorker->monitoring_info.confirmed_lsn = remote_slot->confirmed_lsn;
/* Something changed; reset naptime to default. */
*naptime = WORKER_DEFAULT_NAPTIME_MS;
}
else
{
if (*naptime == WORKER_DEFAULT_NAPTIME_MS)
{
/* If the inactivity time reaches the threshold, increase nap time. */
if (TimestampDifferenceExceeds(MySlotSyncWorker->monitoring_info.last_update_time,
now, WORKER_INACTIVITY_THRESHOLD_MS))
*naptime = WORKER_INACTIVITY_NAPTIME_MS;
}
}
~~~
11. get_remote_invalidation_cause
+/*
+ * Get Remote Slot's invalidation cause.
+ *
+ * This gets invalidation cause of remote slot.
+ */
+static ReplicationSlotInvalidationCause
+get_remote_invalidation_cause(WalReceiverConn *wrconn, char *slot_name)
+{
Isn't that function comment just repeating itself?
~~~
12.
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "select pg_get_slot_invalidation_cause(%s)",
+ quote_literal_cstr(slot_name));
Use uppercase "SELECT" for consistency with other SQL.
~~~
13.
+ /* Make things live outside TX context */
+ MemoryContextSwitchTo(oldctx);
+
+ initStringInfo(&cmd);
+ appendStringInfo(&cmd,
+ "select pg_get_slot_invalidation_cause(%s)",
+ quote_literal_cstr(slot_name));
+ res = walrcv_exec(wrconn, cmd.data, 1, slotRow);
+ pfree(cmd.data);
+
+ CommitTransactionCommand();
+
+ /* Switch to oldctx we saved */
+ MemoryContextSwitchTo(oldctx);
There are 2x MemoryContextSwitchTo(oldctx) here. Is that deliberate?
~~~
14.
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errmsg("could not fetch invalidation cuase for slot \"%s\" from"
+ " primary: %s", slot_name, res->err)));
typo /cuase/cause/
~~~
15.
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ ereport(ERROR,
+ (errmsg("slot \"%s\" disapeared from the primary",
+ slot_name)));
typo /disapeared/disappeared/
~~~
16. drop_obsolete_slots
+/*
+ * Drop obsolete slots
+ *
+ * Drop the slots which no longer need to be synced i.e. these either
+ * do not exist on primary or are no longer part of synchronize_slot_names.
+ *
+ * Also drop the slots which are valid on primary and got invalidated
+ * on standby due to conflict (say required rows removed on primary).
+ * The assumption is, these will get recreated in next sync-cycle and
+ * it is okay to drop and recreate such slots as long as these are not
+ * consumable on standby (which is the case currently).
+ */
/which no/that no/
/which are/that are/
/these will/that these will/
/and got invalidated/that got invalidated/
~~~
17.
+ /* If this slot is being monitored, clean-up the monitoring info */
+ if (strcmp(NameStr(local_slot->data.name),
+ NameStr(MySlotSyncWorker->monitoring_info.slot_name)) == 0)
+ {
+ MemSet(NameStr(MySlotSyncWorker->monitoring_info.slot_name), 0, NAMEDATALEN);
+ MySlotSyncWorker->monitoring_info.confirmed_lsn = 0;
+ MySlotSyncWorker->monitoring_info.last_update_time = 0;
+ }
Maybe it is better to assign InvalidXLogRecPtr instead of 0 to the cleared lsn.
~
Alternatively, consider just zapping the entire monitoring_info
structure in one go:
MemSet(&MySlotSyncWorker->monitoring_info, 0,
sizeof(MySlotSyncWorker->monitoring_info));
~~~
18. construct_slot_query (calling use_slot_in_query)
This separation of functions (use_slot_in_query /
construct_slot_query) seems awkward to me. The use_slot_in_query()
function is only called by construct_slot_query(). I felt it might be
simpler to keep all the logical with the construct_slot_query().
Furthermore, it seemed strange to iterate all the DBs (to populate the
"WHERE database IN" clause) and then iterate all the DBs multiple
times again in use_slot_in_query (looking for slots to populate the
"AND slot_name IN (" clause).
Maybe I misunderstand the reason for this structuring, but IMO it
would be simpler code to keep all the logic in construct_slot_query()
like:
a. Initialize with empty dblist, empty slotlist.
b. Iterate all dbids
- constructing the dblist as you go
- constructing the slot list as you go (if synchronize_slot_names is
not "" or "*")
c. Finally, build the query: basic + dblist-clause + optional slotlist-clause
~~~
19. construct_slot_query
Why does this function return a boolean? I only see it returns true,
but never false.
~~~
20.
+ {
+ ListCell *lc;
+ bool first_slot = true;
+
+
+ foreach(lc, sync_slot_names_list)
Unnecessary blank line.
~~~
21. synchronize_one_slot
+/*
+ * Synchronize single slot to given position.
+ *
+ * This creates new slot if there is no existing one and updates the
+ * metadata of existing slots as per the data received from the primary.
+ */
+static void
+synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot)
/creates new slot/creates a new slot/
/metadata of existing slots/metadata of the slot/
~~~
22
+ /* Search for the named slot and mark it active if we find it. */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ for (int i = 0; i < max_replication_slots; i++)
+ {
+ ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+
+ if (!s->in_use)
+ continue;
+
+ if (strcmp(NameStr(s->data.name), remote_slot->name) == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+ LWLockRelease(ReplicationSlotControlLock);
22a.
"and mark it active if we find it." -- What code here is marking
anything active?
~
22b.
Uncommon style of loop variable declaration
~
22c.
IMO it is over-complicated code; e.g. same loop can be written like this:
SUGGESTION
for (i = 0; i < max_replication_slots && !found; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
if (s->in_use)
found = (strcmp(NameStr(s->data.name), remote_slot->name) == 0);
}
~~~
23. synchronize_slots
+ /* Construct query to get slots info from the primary */
+ initStringInfo(&s);
+ if (!construct_slot_query(&s, dbids))
+ {
+ pfree(s.data);
+ CommitTransactionCommand();
+ LWLockRelease(SlotSyncWorkerLock);
+ return naptime;
+ }
As noted elsewhere, it seems construct_slot_query() will never return
false and so this block of code is unreachable.
~~~
24.
+ /* Create list of remote slot names to be used by drop_obsolete_slots */
+ remote_slot_list = lappend(remote_slot_list, remote_slot);
This is a list of slots, not just slot names.
~~~
25.
+ /*
+ * Update nap time in case of non-zero value returned. The zero value
+ * is returned if remote_slot is not the one being monitored.
+ */
+ value = compute_naptime(remote_slot);
+ if (value)
+ naptime = value;
If the compute_naptime API is changed as suggested in a prior review
comment then this can be simplified to something like:
SUGGESTION:
/* Update nap time as required depending on slot activity. */
compute_naptime(remote_slot, &naptime);
~~~
26.
+ /*
+ * Drop local slots which no longer need to be synced i.e. these either do
+ * not exist on primary or are no longer part of synchronize_slot_names.
+ */
+ drop_obsolete_slots(dbids, remote_slot_list);
/which no longer/that no longer/
I thought it might be better to omit the "i.e." part. Just leave it to
the function-header of drop_obsolete_slots for a detailed explanation
about *which* slots are candidates for dropping.
~
27.
+ /* We are done, free remot_slot_list elements */
+ foreach(cell, remote_slot_list)
+ {
+ RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell);
+
+ pfree(remote_slot);
+ }
27a.
/remot_slot_list/remote_slot_list/
~
27b.
Isn't this just the same as the one-liner:
list_free_deep(remote_slot_list);
~~~
28.
+/*
+ * Initialize the list from raw synchronize_slot_names and cache it, in order
+ * to avoid parsing it repeatedly. Done at slot-sync worker startup and after
+ * each SIGHUP.
+ */
+static void
+SlotSyncInitSlotNamesList()
+{
+ char *rawname;
+
+ if (strcmp(synchronize_slot_names, "") != 0 &&
+ strcmp(synchronize_slot_names, "*") != 0)
+ {
+ rawname = pstrdup(synchronize_slot_names);
+ SplitIdentifierString(rawname, ',', &sync_slot_names_list);
+ }
+}
28a.
Why this static function name is camel-case, unlike all the others?
~
28b.
What about when the sync_slot_names_list changes from value to "" or
"*". Shouldn't this function be setting sync_slot_names_list = NIL for
that scenario?
~~~
29. remote_connect
+/*
+ * Connect to remote (primary) server.
+ *
+ * This uses primary_conninfo in order to connect to primary. For slot-sync
+ * to work, primary_conninfo is expected to have dbname as well.
+ */
+static WalReceiverConn *
+remote_connect()
29a.
I felt it might be more helpful to say "GUC primary_conninfo" instead
of just 'primary_conninfo' the first time this is mentioned.
~
29b.
/connect to primary/connect to the primary/
~
29c.
/is expected to have/is required to specify/
~~~
30. reconnect_if_needed
+/*
+ * Reconnect to remote (primary) server if PrimaryConnInfo got changed.
+ */
+static WalReceiverConn *
+reconnect_if_needed(WalReceiverConn *wrconn_prev, char *conninfo_prev)
/got changed/has changed/
~~~
31.
+static WalReceiverConn *
+reconnect_if_needed(WalReceiverConn *wrconn_prev, char *conninfo_prev)
+{
+ WalReceiverConn *wrconn = NULL;
+
+ /* If no change in PrimaryConnInfo, return previous connection itself */
+ if (strcmp(conninfo_prev, PrimaryConnInfo) == 0)
+ return wrconn_prev;
+
+ walrcv_disconnect(wrconn);
+ wrconn = remote_connect();
+ return wrconn;
+}
/return previous/return the previous/
Disconnect NULL is a bug isn't it? Don't you mean to disconnect 'wrconn_prev'?
~~~
32. slotsync_worker_detach
+/*
+ * Detach the worker from DSM and update 'proc' and 'in_use'.
+ * Logical replication launcher will come to know using these
+ * that the worker has shutdown.
+ */
+static void
+slotsync_worker_detach(int code, Datum arg)
+{
+ dsa_detach((dsa_area *) DatumGetPointer(arg));
+ LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
+ MySlotSyncWorker->hdr.in_use = false;
+ MySlotSyncWorker->hdr.proc = NULL;
+ LWLockRelease(SlotSyncWorkerLock);
+}
I expected this function to be in the same module as
slotsync_worker_attach. It seems a bit strange to have them separated.
~~~
33. ReplSlotSyncMain
+ ereport(ERROR,
+ (errmsg("The dbname not specified in primary_conninfo, skipping"
+ " slots synchronization"),
+ errhint("Specify dbname in primary_conninfo for slots"
+ " synchronization to proceed")));
/not specified in/was not specified in/
/slots synchronization/slot synchronization/ (??) -- there are multiple of these
~
34.
+ /*
+ * Connect to the database specified by user in PrimaryConnInfo. We need
+ * database connection for walrcv_exec to work. Please see comments atop
+ * libpqrcv_exec.
+ */
/database connection/a database connection/
~~~
35.
+ /* Reconnect if primary_conninfo got changed */
+ if (config_reloaded)
+ wrconn = reconnect_if_needed(wrconn, conninfo_prev);
SUGGESTION
Reconnect if GUC primary_conninfo has changed.
~
36.
+ /*
+ * The slot-sync worker must not get here because it will only stop when
+ * it receives a SIGINT from the logical replication launcher, or when
+ * there is an error. None of these cases will allow the code to reach
+ * here.
+ */
+ Assert(false);
36a.
/must not/cannot/
36b.
"None of these cases will allow the code to reach here." <-- redundant sentence
======
Kind Regards,
Peter Smith.
Fujitsu Australia
From | Date | Subject | |
---|---|---|---|
Next Message | Amit Kapila | 2023-09-27 04:45:24 | Re: Move global variables of pgoutput to plugin private scope. |
Previous Message | Hayato Kuroda (Fujitsu) | 2023-09-27 04:34:16 | RE: [PoC] pg_upgrade: allow to upgrade publisher node |