From: | Peter Smith <smithpb2250(at)gmail(dot)com> |
---|---|
To: | shveta malik <shveta(dot)malik(at)gmail(dot)com> |
Cc: | "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>, Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, "Hayato Kuroda (Fujitsu)" <kuroda(dot)hayato(at)fujitsu(dot)com>, Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, Bruce Momjian <bruce(at)momjian(dot)us>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Ajin Cherian <itsajin(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)alvh(dot)no-ip(dot)org> |
Subject: | Re: Synchronizing slots from primary to standby |
Date: | 2023-10-04 03:22:41 |
Message-ID: | CAHut+Ptbb3Ydx40a0p7Qovvp-4cC4ZCDreGRjmFzou8mjh2PmA@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Here are some review comments for v20-0002.
======
1. GENERAL - errmsg/elog messages
There are a a lot of minor problems and/or quirks across all the
message texts. Here is a summary of some I found:
ERROR
errmsg("could not receive list of slots from the primary server: %s",
errmsg("invalid response from primary server"),
errmsg("invalid connection string syntax: %s",
errmsg("replication slot-sync worker slot %d is empty, cannot attach",
errmsg("replication slot-sync worker slot %d is already used by
another worker, cannot attach",
errmsg("replication slot-sync worker slot %d is already used by
another worker, cannot attach",
errmsg("could not connect to the primary server: %s",
errmsg("operation not permitted on replication slots on standby which
are synchronized from primary")));
/primary/the primary/
errmsg("could not fetch invalidation cuase for slot \"%s\" from primary: %s",
/cuase/cause/
/primary/the primary/
errmsg("slot \"%s\" disapeared from the primary",
/disapeared/disappeared/
errmsg("could not fetch slot info from the primary: %s",
errmsg("could not connect to the primary server: %s", err)));
errmsg("could not map dynamic shared memory segment for slot-sync worker")));
errmsg("physical replication slot %s found in synchronize_slot_names",
slot name not quoted?
---
WARNING
errmsg("out of background worker slots"),
errmsg("Replication slot-sync worker failed to attach to worker-pool slot %d",
case?
errmsg("Removed database %d from replication slot-sync worker %d;
dbcount now: %d",
case?
errmsg("Skipping slots synchronization as primary_slot_name is not set."));
case?
errmsg("Skipping slots synchronization as hot_standby_feedback is off."));
case?
errmsg("Skipping slots synchronization as dbname is not specified in
primary_conninfo."));
case?
errmsg("slot-sync wait for slot %s interrupted by promotion, slot
creation aborted",
errmsg("could not fetch slot info for slot \"%s\" from primary: %s",
/primary/the primary/
errmsg("slot \"%s\" disappeared from the primary, aborting slot creation",
errmsg("slot \"%s\" invalidated on primary, aborting slot creation",
errmsg("slot-sync for slot %s interrupted by promotion, sync not possible",
slot name not quoted?
errmsg("skipping sync of slot \"%s\" as the received slot-sync lsn
%X/%X is ahead of the standby position %X/%X",
errmsg("not synchronizing slot %s; synchronization would move it backward",
slot name not quoted?
/backward/backwards/
---
LOG
errmsg("Added database %d to replication slot-sync worker %d; dbcount now: %d",
errmsg("Added database %d to replication slot-sync worker %d; dbcount now: %d",
errmsg("Stopping replication slot-sync worker %d",
errmsg("waiting for remote slot \"%s\" LSN (%u/%X) and catalog xmin
(%u) to pass local slot LSN (%u/%X) and and catalog xmin (%u)",
errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)and catalog
xmin (%u) has now passed local slot LSN (%X/%X) and catalog xmin
(%u)",
missing spaces?
elog(LOG, "Dropped replication slot \"%s\" ",
extra space?
why this one is elog but others are not?
elog(LOG, "Replication slot-sync worker %d is shutting down on
receiving SIGINT", MySlotSyncWorker->slot);
case?
why this one is elog but others are not?
elog(LOG, "Replication slot-sync worker %d started", worker_slot);
case?
why this one is elog but others are not?
----
DEBUG1
errmsg("allocated dsa for slot-sync worker for dbcount: %d"
worker number not given?
should be elog?
errmsg_internal("logical replication launcher started")
should be elog?
----
DEBUG2
elog(DEBUG2, "slot-sync worker%d's query:%s \n",
missing space after 'worker'
extra space before \n
======
.../libpqwalreceiver/libpqwalreceiver.c
2. libpqrcv_get_dbname_from_conninfo
+/*
+ * Get database name from primary conninfo.
+ *
+ * If dbanme is not found in connInfo, return NULL value.
+ * The caller should take care of handling NULL value.
+ */
+static char *
+libpqrcv_get_dbname_from_conninfo(const char *connInfo)
2a.
/dbanme/dbname/
~
2b.
"The caller should take care of handling NULL value."
IMO this is not very useful; it's like saying "caller must handle
function return values".
~~~
3.
+ for (opt = opts; opt->keyword != NULL; ++opt)
+ {
+ /* Ignore connection options that are not present. */
+ if (opt->val == NULL)
+ continue;
+
+ if (strcmp(opt->keyword, "dbname") == 0 && opt->val[0] != '\0')
+ {
+ dbname = pstrdup(opt->val);
+ }
+ }
3a.
If there are multiple "dbname" in the conninfo then it will be the
LAST one that is returned.
Judging by my quick syntax experiment (below) this seemed like the
correct thing to do, but I think there should be some comment to
explain about it.
test_sub=# create subscription sub1 connection 'dbname=foo dbname=bar
dbname=test_pub' publication pub1;
2023-09-28 19:15:15.012 AEST [23997] WARNING: subscriptions created
by regression test cases should have names starting with "regress_"
WARNING: subscriptions created by regression test cases should have
names starting with "regress_"
NOTICE: created replication slot "sub1" on publisher
CREATE SUBSCRIPTION
~
3b.
The block brackets {} are not needed for the single statement.
~
3c.
Since there is only one keyword of interest here it seemed overkill to
have a separate 'continue' check. Why not do everything in one line:
for (opt = opts; opt->keyword != NULL; ++opt)
{
if (strcmp(opt->keyword, "dbname") == 0 && opt->val && opt->val[0] != '\0')
dbname = pstrdup(opt->val);
}
======
src/backend/replication/logical/launcher.c
4.
+/*
+ * The local variables to store the current values of slot-sync related GUCs
+ * before each ConfigReload.
+ */
+static char *PrimaryConnInfoPreReload = NULL;
+static char *PrimarySlotNamePreReload = NULL;
+static char *SyncSlotNamesPreReload = NULL;
/The local variables/Local variables/
~~~
5. fwd declare
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
+static void slotsync_worker_cleanup(SlotSyncWorker *worker);
static int logicalrep_pa_worker_count(Oid subid);
5a.
Hmmn, I think there were lot more added static functions than just this one.
e.g. what about all these?
static SlotSyncWorker *slotsync_worker_find
static dsa_handle slotsync_dsa_setup
static bool slotsync_worker_launch_or_reuse
static void slotsync_worker_stop_internal
static void slotsync_workers_stop
static void slotsync_remove_obsolete_dbs
static WalReceiverConn *primary_connect
static void SaveCurrentSlotSyncConfigs
static bool SlotSyncConfigsChanged
static void ApplyLauncherStartSlotSync
static void ApplyLauncherStartSubs
~
5b.
There are inconsistent name style used for the new static functions --
e.g. snake_case versus CamelCase.
~~~
6. WaitForReplicationWorkerAttach
int rc;
+ bool is_slotsync_worker = (lock == SlotSyncWorkerLock) ? true : false;
This seemed a hacky way to distinguish the sync-slot workers from
other kinds of workers. Wouldn't it be better to pass another
parameter to this function?
~~~
7. slotsync_worker_attach
It looks like almost a clone of the logicalrep_worker_attach. Seems a
shame if cannot make use of common code.
~~~
8. slotsync_worker_find
+ * Walks the slot-sync workers pool and searches for one that matches given
+ * dbid. Since one worker can manage multiple dbs, so it walks the db array in
+ * each worker to find the match.
8a.
SUGGESTION
Searches the slot-sync worker pool for the worker who manages the
specified dbid. Because a worker can manage multiple dbs, also walk
the db array of each worker to find the match.
~
8b.
Should the comment also say something like "Returns NULL if no
matching worker is found."
~~~
9.
+ /* Search for attached worker for a given dbid */
SUGGESTION
Search for an attached worker managing the given dbid.
~~~
10.
+{
+ int i;
+ SlotSyncWorker *res = NULL;
+ Oid *dbids;
+
+ Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED));
+
+ /* Search for attached worker for a given dbid */
+ for (i = 0; i < max_slotsync_workers; i++)
+ {
+ SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
+ int cnt;
+
+ if (!w->hdr.in_use)
+ continue;
+
+ dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp);
+ for (cnt = 0; cnt < w->dbcount; cnt++)
+ {
+ Oid wdbid = dbids[cnt];
+
+ if (wdbid == dbid)
+ {
+ res = w;
+ break;
+ }
+ }
+
+ /* If worker is found, break the outer loop */
+ if (res)
+ break;
+ }
+
+ return res;
+}
IMO this logical can be simplified a lot:
- by not using the 'res' variable; directly return instead.
- also moved the 'dbids' declaration.
- and 'cnt' variable seems not meaningful; replace with 'dbidx' for
the db array index IMO.
For example (25 lines instead of 35 lines)
{
int i;
Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_SHARED));
/* Search for an attached worker managing the given dbid. */
for (i = 0; i < max_slotsync_workers; i++)
{
SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
int dbidx;
Oid *dbids;
if (!w->hdr.in_use)
continue;
dbids = (Oid *) dsa_get_address(w->dbids_dsa, w->dbids_dp);
for (dbidx = 0; dbidx < w->dbcount; dbidx++)
{
if (dbids[dbidx] == dbid)
return w;
}
}
return NULL;
}
~~~
11. slot_sync_dsa_setup
+/*
+ * Setup DSA for slot-sync worker.
+ *
+ * DSA is needed for dbids array. Since max number of dbs a worker can manage
+ * is not known, so initially fixed size to hold DB_PER_WORKER_ALLOC_INIT
+ * dbs is allocated. If this size is exhausted, it can be extended using
+ * dsa free and allocate routines.
+ */
+static dsa_handle
+slotsync_dsa_setup(SlotSyncWorker *worker, int alloc_db_count)
11a.
SUGGESTION
DSA is used for the dbids array. Because the maximum number of dbs a
worker can manage is not known, initially enough memory for
DB_PER_WORKER_ALLOC_INIT dbs is allocated. If this size is exhausted,
it can be extended using dsa free and allocate routines.
~
11b.
It doesn't make sense for the comment to say DB_PER_WORKER_ALLOC_INIT
is the initial allocation, but then the function has a parameter
'alloc_db_count' (which is always passed as DB_PER_WORKER_ALLOC_INIT).
IMO revemo the 2nd parameter from this function and hardwire the
initial allocation same as what the function comment says.
~~~
12.
+ /* Be sure any memory allocated by DSA routines is persistent. */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
/Be sure any memory/Ensure the memory/
~~~
13. slotsync_worker_launch_or_reuse
+/*
+ * Slot-sync worker launch or reuse
+ *
+ * Start new slot-sync background worker from the pool of available workers
+ * going by max_slotsync_workers count. If the worker pool is exhausted,
+ * reuse the existing worker with minimum number of dbs. The idea is to
+ * always distribute the dbs equally among launched workers.
+ * If initially allocated dbids array is exhausted for the selected worker,
+ * reallocate the dbids array with increased size and copy the existing
+ * dbids to it and assign the new one as well.
+ *
+ * Returns true on success, false on failure.
+ */
/going by/limited by/ (??)
~~~
14.
+ BackgroundWorker bgw;
+ BackgroundWorkerHandle *bgw_handle;
+ uint16 generation;
+ SlotSyncWorker *worker = NULL;
+ uint32 mindbcnt = 0;
+ uint32 alloc_count = 0;
+ uint32 copied_dbcnt = 0;
+ Oid *copied_dbids = NULL;
+ int worker_slot = -1;
+ dsa_handle handle;
+ Oid *dbids;
+ int i;
+ bool attach;
IIUC many of these variables can be declared at a different scope in
this function, so they will be closer to where they are used.
~~~
15.
+ /*
+ * We need to do the modification of the shared memory under lock so that
+ * we have consistent view.
+ */
+ LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE);
The current comment seems too much.
SUGGESTION
The shared memory must only be modified under lock.
~~~
16.
+ /* Find unused worker slot. */
+ for (i = 0; i < max_slotsync_workers; i++)
+ {
+ SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
+
+ if (!w->hdr.in_use)
+ {
+ worker = w;
+ worker_slot = i;
+ break;
+ }
+ }
+
+ /*
+ * If all the workers are currently in use. Find the one with minimum
+ * number of dbs and use that.
+ */
+ if (!worker)
+ {
+ for (i = 0; i < max_slotsync_workers; i++)
+ {
+ SlotSyncWorker *w = &LogicalRepCtx->ss_workers[i];
+
+ if (i == 0)
+ {
+ mindbcnt = w->dbcount;
+ worker = w;
+ worker_slot = i;
+ }
+ else if (w->dbcount < mindbcnt)
+ {
+ mindbcnt = w->dbcount;
+ worker = w;
+ worker_slot = i;
+ }
+ }
+ }
Why not combine these 2 loops, to avoid iterating over the same slots
twice? Then, exit the loop immediately if unused worker found,
otherwise if reach the end of loop having not found anything unused
then you will already know the one having least dbs.
~~~
17.
+ /* Remember the old dbids before we reallocate dsa. */
+ copied_dbcnt = worker->dbcount;
+ copied_dbids = (Oid *) palloc0(worker->dbcount * sizeof(Oid));
+ memcpy(copied_dbids, dbids, worker->dbcount * sizeof(Oid));
17a.
Who frees this copied_dbids memory when you are finished needed it. It
seems allocated in the TopMemoryContext so IIUC this is a leak.
~
17b.
These are the 'old' values. Not the 'copied' values. The copied_xxx
variable names seem misleading.
~~~
18.
+ /* Prepare the new worker. */
+ worker->hdr.launch_time = GetCurrentTimestamp();
+ worker->hdr.in_use = true;
If a new worker is required then the launch_time is set like above.
+ {
+ slot_db_data->last_launch_time = now;
+
+ slotsync_worker_launch_or_reuse(slot_db_data->database);
+ }
Meanwhile, at the caller of slotsync_worker_launch_or_reuse(), the
dbid launch_time was already set as well. And those two timestamps are
almost (but not quite) the same value. Isn't that a bit strange?
~~~
19.
+ /* Initial DSA setup for dbids array to hold DB_PER_WORKER_ALLOC_INIT dbs */
+ handle = slotsync_dsa_setup(worker, DB_PER_WORKER_ALLOC_INIT);
+ dbids = (Oid *) dsa_get_address(worker->dbids_dsa, worker->dbids_dp);
+
+ dbids[worker->dbcount++] = dbid;
Where was this worker->dbcount assigned to 0?
Maybe it's better to do this explicity under the "/* Prepare the new
worker. */" comment.
~~~
20.
+ if (!attach)
+ ereport(WARNING,
+ (errmsg("Replication slot-sync worker failed to attach to "
+ "worker-pool slot %d", worker_slot)));
+
+ /* Attach is done, now safe to log that the worker is managing dbid */
+ if (attach)
+ ereport(LOG,
+ (errmsg("Added database %d to replication slot-sync "
+ "worker %d; dbcount now: %d",
+ dbid, worker_slot, worker->dbcount)));
20a.
IMO this should be coded as "if (attach) ...; else ..."
~
99b.
In other code if it failed to register then slotsync_worker_cleanup
code is called. How come similar code is not done when fails to
attach?
~~~
21. slotsync_worker_stop_internal
+/*
+ * Internal function to stop the slot-sync worker and wait until it detaches
+ * from the slot-sync worker-pool slot.
+ */
+static void
+slotsync_worker_stop_internal(SlotSyncWorker *worker)
IIUC this function does a bit more than what the function comment
says. IIUC (again) I think the "detached" worker slot will still be
flagged as 'inUse' but this function then does the extra step of
calling slotsync_worker_cleanup() function to make the worker slot
available for next process that needs it, am I correct?
In this regard, this function seems a lot more like
logicalrep_worker_detach() function comment, so there seems some kind
of muddling of the different function names here... (??).
~~~
22. slotsync_remove_obsolete_dbs
This function says:
+/*
+ * Slot-sync workers remove obsolete DBs from db-list
+ *
+ * If the DBIds fetched from the primary are lesser than the ones being managed
+ * by slot-sync workers, remove extra dbs from worker's db-list. This
may happen
+ * if some slots are removed on primary but 'synchronize_slot_names' has not
+ * been changed yet.
+ */
+static void
+slotsync_remove_obsolete_dbs(List *remote_dbs)
But, there was another similar logic function too:
+/*
+ * Drop obsolete slots
+ *
+ * Drop the slots which no longer need to be synced i.e. these either
+ * do not exist on primary or are no longer part of synchronize_slot_names.
+ *
+ * Also drop the slots which are valid on primary and got invalidated
+ * on standby due to conflict (say required rows removed on primary).
+ * The assumption is, these will get recreated in next sync-cycle and
+ * it is okay to drop and recreate such slots as long as these are not
+ * consumable on standby (which is the case currently).
+ */
+static void
+drop_obsolete_slots(Oid *dbids, List *remote_slot_list)
Those function header comments suggest these have a lot of overlapping
functionality.
Can't those 2 functions be combined? Or maybe one delegate to the other?
~~~
23.
+ ListCell *lc;
+ Oid *dbids;
+ int widx;
+ int dbidx;
+ int i;
Scope of some of these variable declarations can be different so they
are declared closer to where they are used.
~~~
24.
+ /* If not found, then delete this db from worker's db-list */
+ if (!found)
+ {
+ for (i = dbidx; i < worker->dbcount; i++)
+ {
+ /* Shift the DBs and get rid of wdbid */
+ if (i < (worker->dbcount - 1))
+ dbids[i] = dbids[i + 1];
+ }
IIUC, that shift/loop could just have been a memmove() call to remove
one Oid element.
~~~
25.
+ /* If dbcount for any worker has become 0, shut it down */
+ for (widx = 0; widx < max_slotsync_workers; widx++)
+ {
+ SlotSyncWorker *worker = &LogicalRepCtx->ss_workers[widx];
+
+ if (worker->hdr.in_use && !worker->dbcount)
+ slotsync_worker_stop_internal(worker);
+ }
Is it safe to stop this unguarded by SlotSyncWorkerLock locking? Is
there a window where another dbid decides to reuse this worker at the
same time this process is about to stop it?
~~~
26. primary_connect
+/*
+ * Connect to primary server for slotsync purpose and return the connection
+ * info. Disconnect previous connection if provided in wrconn_prev.
+ */
/primary server/the primary server/
~~~
27.
+ if (!RecoveryInProgress())
+ return NULL;
+
+ if (max_slotsync_workers == 0)
+ return NULL;
+
+ if (strcmp(synchronize_slot_names, "") == 0)
+ return NULL;
+
+ /* The primary_slot_name is not set */
+ if (!WalRcv || WalRcv->slotname[0] == '\0')
+ {
+ ereport(WARNING,
+ errmsg("Skipping slots synchronization as primary_slot_name "
+ "is not set."));
+ return NULL;
+ }
+
+ /* The hot_standby_feedback must be ON for slot-sync to work */
+ if (!hot_standby_feedback)
+ {
+ ereport(WARNING,
+ errmsg("Skipping slots synchronization as hot_standby_feedback "
+ "is off."));
+ return NULL;
+ }
How come some of these checks giving WARNING that slot synchronization
will be skipped, but others are just silently returning NULL?
~~~
28. SaveCurrentSlotSyncConfigs
+static void
+SaveCurrentSlotSyncConfigs()
+{
+ PrimaryConnInfoPreReload = pstrdup(PrimaryConnInfo);
+ PrimarySlotNamePreReload = pstrdup(WalRcv->slotname);
+ SyncSlotNamesPreReload = pstrdup(synchronize_slot_names);
+}
Shouldn't this code also do pfree first? Otherwise these will slowly
leak every time this function is called, right?
~~~
29. SlotSyncConfigsChanged
+static bool
+SlotSyncConfigsChanged()
+{
+ if (strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0)
+ return true;
+
+ if (strcmp(PrimarySlotNamePreReload, WalRcv->slotname) != 0)
+ return true;
+
+ if (strcmp(SyncSlotNamesPreReload, synchronize_slot_names) != 0)
+ return true;
I felt those can all be combined to have 1 return instead of 3.
~~~
30.
+ /*
+ * If we have reached this stage, it means original value of
+ * hot_standby_feedback was 'true', so consider it changed if 'false' now.
+ */
+ if (!hot_standby_feedback)
+ return true;
"If we have reached this stage" seems a bit vague. Can this have some
more explanation? And, maybe also an Assert(hot_standby_feedback); is
helpful in the calling code (before the config is reloaded)?
~~~
31. ApplyLauncherStartSlotSync
+ * It connects to primary, get the list of DBIDs for slots configured in
+ * synchronize_slot_names. It then launces the slot-sync workers as per
+ * max_slotsync_workers and then assign the DBs equally to the workers
+ * launched.
+ */
SUGGESTION (fix typos etc)
Connect to the primary, to get the list of DBIDs for slots configured
in synchronize_slot_names. Then launch slot-sync workers (limited by
max_slotsync_workers) where the DBs are distributed equally among
those workers.
~~~
32.
+static void
+ApplyLauncherStartSlotSync(long *wait_time, WalReceiverConn *wrconn)
Why does this function even have 'Apply' in the name when it is
nothing to do with an apply worker; looks like some cut/paste
hangover. How about calling it something like 'LaunchSlotSyncWorkers'
~~~
33.
+ /* If connection is NULL due to lack of correct configurations, return */
+ if (!wrconn)
+ return;
IMO it would be better to Assert wrconn in this function. If it is
NULL then it should be checked a the caller, otherwise it just raises
more questions -- like "who logged the warning about bad
configuration" etc (which I already questions the NULL returns of
primary_connect.
~~~
34.
+ if (!OidIsValid(slot_db_data->database))
+ continue;
This represents some kind of integrity error doesn't it? Is it really
OK just to silently skip such a thing?
~~~
35.
+ /*
+ * If the worker is eligible to start now, launch it. Otherwise,
+ * adjust wait_time so that we'll wake up as soon as it can be
+ * started.
+ *
+ * Each apply worker can only be restarted once per
+ * wal_retrieve_retry_interval, so that errors do not cause us to
+ * repeatedly restart the worker as fast as possible.
+ */
35a.
I found the "we" part of "so that we'll wake up..." to be a bit
misleading. There is no waiting in this function; that wait value is
handed back to the caller to deal with. TBH, I did not really
understand why it is even necessary tp separate the waiting
calculation *per-worker* like this. It seems to overcomplicate things
and it might even give results like 1st worker is not started but last
works is started (if enough time elapsed in the loop). Why can't all
this wait logic be done one time up front, and either (a) start all
necessary workers, or (b) start none of them and wait a bit longer.
~
35b.
"Each apply worker". Why is this talking about "apply" workers? Maybe
cut/paste error?
~~~
36.
+ last_launch_tried = slot_db_data->last_launch_time;
+ now = GetCurrentTimestamp();
+ if (last_launch_tried == 0 ||
+ (elapsed = TimestampDifferenceMilliseconds(last_launch_tried, now)) >=
+ wal_retrieve_retry_interval)
+ {
+ slot_db_data->last_launch_time = now;
+
+ slotsync_worker_launch_or_reuse(slot_db_data->database);
+ }
+ else
+ {
+ *wait_time = Min(*wait_time,
+ wal_retrieve_retry_interval - elapsed);
+ }
36a.
IMO this might be simpler if you add another variable like bool 'launch_now':
last_launch_tried = ...
now = ...
elapsed = ...
launch_now = elapsed >= wal_retrieve_retry_interval;
~
36b.
Do you really care about checking "last_launch_tried == 0"; If it
really is zero, then I thought the elapsed check should be enough.
~
36c.
Does this 'last_launch_time' really need to be in some shared memory?
Won't a static variable suffice?
~~~
37. ApplyLauncherStartSubs
Wouldn't a better name for the function be something like
'LaunchSubscriptionApplyWorker'? (it is a better match for the
suggested LaunchSlotSyncWorkers)
~~~
38. ApplyLauncherMain
Now that this is not only for Apply worker but also for SlotSync
workers, maybe this function should be renamed as just LauncherMain,
or something equally generic?
~~~
39.
+ load_file("libpqwalreceiver", false);
+
+ wrconn = primary_connect(NULL);
+
This connection did not exist in the HEAD code so I think it is added
only for the slot-sync logic. IIUC it is still doing nothing for the
non-slot-sync cases because primary_connect will silently return in
that case:
+ if (!RecoveryInProgress())
+ return NULL;
IMO this is too sneaky, and it is misleading to see the normal apply
worker launch apparently ccnnecting to something when it is not really
doing so AFAIK. I think these conditions should be done explicity here
at the caller to remove any such ambiguity.
~~~
40.
+ if (!RecoveryInProgress())
+ ApplyLauncherStartSubs(&wait_time);
+ else
+ ApplyLauncherStartSlotSync(&wait_time, wrconn);
40a.
IMO this is deserving of a comment to explain why RecoveryInProgress
means to perform the slot-synchronization.
~
40b.
Also, better to have positive check RecoveryInProgress() instead of
!RecoveryInProgress()
~~~
41.
if (ConfigReloadPending)
{
+ bool ssConfigChanged = false;
+
+ SaveCurrentSlotSyncConfigs();
+
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
+
+ /*
+ * Stop the slot-sync workers if any of the related GUCs changed.
+ * These will be relaunched as per the new values during next
+ * sync-cycle.
+ */
+ ssConfigChanged = SlotSyncConfigsChanged();
+ if (ssConfigChanged)
+ slotsync_workers_stop();
+
+ /* Reconnect in case primary_conninfo has changed */
+ wrconn = primary_connect(wrconn);
}
}
~
41a.
The 'ssConfigChanged' assignement at declaration is not needed.
Indeed, the whole variable is not really necessary because it is used
only once.
~
41b.
/as per the new values/using the new values/
~
41c.
+ /* Reconnect in case primary_conninfo has changed */
+ wrconn = primary_connect(wrconn);
To avoid unnecessary reconnections, shouldn't this be done only if
(ssConfigChanged).
In fact, assuming the comment is correct, reconnect only if
(strcmp(PrimaryConnInfoPreReload, PrimaryConnInfo) != 0)
======
src/backend/replication/logical/slotsync.c
42. wait_for_primary_slot_catchup
+ ereport(LOG,
+ errmsg("waiting for remote slot \"%s\" LSN (%u/%X) and catalog xmin"
+ " (%u) to pass local slot LSN (%u/%X) and and catalog xmin (%u)",
+ remote_slot->name,
+ LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+ remote_slot->catalog_xmin,
+ LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn),
+ MyReplicationSlot->data.catalog_xmin));
AFAIK it is usual for the LSN format string to be %X/%X (not %u/%X like here).
~~~
43.
+ appendStringInfo(&cmd,
+ "SELECT restart_lsn, confirmed_flush_lsn, catalog_xmin"
+ " FROM pg_catalog.pg_replication_slots"
+ " WHERE slot_name = %s",
+ quote_literal_cstr(remote_slot->name));
double space before FROM?
~~~
44. synchronize_one_slot
+ /*
+ * We might not have the WALs retained locally corresponding to
+ * remote's restart_lsn if our local restart_lsn and/or local
+ * catalog_xmin is ahead of remote's one. And thus we can not create
+ * the local slot in sync with primary as that would mean moving local
+ * slot backward. Thus wait for primary's restart_lsn and catalog_xmin
+ * to catch up with the local ones and then do the sync.
+ */
+ if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn ||
+ TransactionIdPrecedes(remote_slot->catalog_xmin,
+ MyReplicationSlot->data.catalog_xmin))
+ {
+ if (!wait_for_primary_slot_catchup(wrconn, remote_slot))
+ {
+ /*
+ * The remote slot didn't catch up to locally reserved
+ * position
+ */
+ ReplicationSlotRelease();
+ CommitTransactionCommand();
+ return;
+ }
SUGGESTION (comment is slightly simplified)
If the local restart_lsn and/or local catalog_xmin is ahead of those
on the remote then we cannot create the local slot in sync with
primary because that would mean moving local slot backwards. In this
case we will wait for primary's restart_lsn and catalog_xmin to catch
up with the local one before attempting the sync.
======
Kind Regards,
Peter Smith.
Fujitsu Australia
From | Date | Subject | |
---|---|---|---|
Next Message | Michael Paquier | 2023-10-04 03:57:29 | Re: Making aggregate deserialization (and WAL receive) functions slightly faster |
Previous Message | Thomas wen | 2023-10-04 02:57:08 | 回复: XID formatting and SLRU refactorings (was: Add 64-bit XIDs into PostgreSQL 15) |