From 3e541cbaacfde2c212dd95c7d53329399511a9ec Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Sat, 26 Apr 2025 00:06:40 +0900 Subject: [PATCH v1] Refactor logicalrep_worker_launch(). --- src/backend/replication/logical/launcher.c | 64 +++++++++++----------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 10677da56b2..e2af6ccbdeb 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -96,7 +96,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); -static int logicalrep_pa_worker_count(Oid subid); +static void logicalrep_worker_count(Oid subid, int *nsync, int *npa); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); @@ -336,7 +336,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); -retry: /* Find unused worker slot. */ for (i = 0; i < max_logical_replication_workers; i++) { @@ -350,7 +349,7 @@ retry: } } - nsyncworkers = logicalrep_sync_worker_count(subid); + logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers); now = GetCurrentTimestamp(); @@ -359,7 +358,8 @@ retry: * reason we do this is because if some worker failed to start up and its * parent has crashed while waiting, the in_use state was never cleared. */ - if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription || + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) { bool did_cleanup = false; @@ -381,11 +381,17 @@ retry: logicalrep_worker_cleanup(w); did_cleanup = true; + + if (worker == NULL) + { + worker = w; + slot = i; + } } } if (did_cleanup) - goto retry; + logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers); } /* @@ -399,8 +405,6 @@ retry: return false; } - nparallelapplyworkers = logicalrep_pa_worker_count(subid); - /* * Return false if the number of parallel apply workers reached the limit * per subscription. @@ -844,48 +848,42 @@ logicalrep_worker_onexit(int code, Datum arg) int logicalrep_sync_worker_count(Oid subid) { - int i; int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - - /* Search for attached worker for a given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) - { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - - if (isTablesyncWorker(w) && w->subid == subid) - res++; - } - + logicalrep_worker_count(subid, &res, NULL); return res; } /* - * Count the number of registered (but not necessarily running) parallel apply - * workers for a subscription. + * Count the number of registered (but not necessarily running) sync workers + * and parallel apply workers for a subscription. */ -static int -logicalrep_pa_worker_count(Oid subid) +static void +logicalrep_worker_count(Oid subid, int *nsync, int *npa) { - int i; - int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + if (nsync != NULL) + *nsync = 0; + if (npa != NULL) + *npa = 0; + /* - * Scan all attached parallel apply workers, only counting those which - * have the given subscription id. + * Scan all attached sync and parallel apply workers, only counting those + * which have the given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) + for (int i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isParallelApplyWorker(w) && w->subid == subid) - res++; + if (w->subid == subid) + { + if (nsync != NULL && isTablesyncWorker(w)) + (*nsync)++; + if (npa != NULL && isParallelApplyWorker(w)) + (*npa)++; + } } - - return res; } /* -- 2.49.0