From e5bdcc64258e4b68f42d868b13612b771e153268 Mon Sep 17 00:00:00 2001 From: Fujii Masao Date: Mon, 28 Apr 2025 14:29:26 +0900 Subject: [PATCH v2] Fix bug that could block the startup of parallel apply workers. If a logical replication worker fails to start and its parent crashes while waiting, its worker slot can remain marked as "in use". This can prevent new workers from starting, as the launcher may not find a free slot or may incorrectly think the sync or parallel apply worker limits have been reached. To handle this, the launcher already performs garbage collection when no free slot is found or when the sync worker limit is hit, and then retries launching workers. However, it previously did not trigger garbage collection when the parallel apply worker limit was reached. As a result, stale slots could block new parallel apply workers from starting, even though they could have been launched after cleanup. This commit fixes the issue by triggering garbage collection when the parallel apply worker limit is reached as well. If stale slots are cleared and the number of parallel apply workers drops below the limit, new parallel apply worker can then be started successfully. Back-patch to v16, where parallel apply workers were introduced. --- src/backend/replication/logical/launcher.c | 65 +++++++++++----------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 8395ae7b23c..ebf3220da01 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -102,7 +102,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); -static int logicalrep_pa_worker_count(Oid subid); +static void logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); @@ -350,16 +350,21 @@ retry: } } - nsyncworkers = logicalrep_sync_worker_count(subid); + logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers); now = GetCurrentTimestamp(); /* - * If we didn't find a free slot, try to do garbage collection. The - * reason we do this is because if some worker failed to start up and its - * parent has crashed while waiting, the in_use state was never cleared. + * If we can't start a new logical replication background worker because + * no free slot is available, or because the number of sync workers or + * parallel apply workers has reached the limit per subscriptoin, try + * running garbage collection. The reason we do this is because if some + * workers failed to start up and their parent has crashed while waiting, + * the in_use state was never cleared. By freeing up these stale worker + * slots, we may be able to start a new worker. */ - if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription || + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) { bool did_cleanup = false; @@ -399,8 +404,6 @@ retry: return false; } - nparallelapplyworkers = logicalrep_pa_worker_count(subid); - /* * Return false if the number of parallel apply workers reached the limit * per subscription. @@ -831,48 +834,42 @@ logicalrep_worker_onexit(int code, Datum arg) int logicalrep_sync_worker_count(Oid subid) { - int i; int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - - /* Search for attached worker for a given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) - { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - - if (w->subid == subid && OidIsValid(w->relid)) - res++; - } - + logicalrep_worker_count(subid, &res, NULL); return res; } /* - * Count the number of registered (but not necessarily running) parallel apply - * workers for a subscription. + * Count the number of registered (but not necessarily running) sync workers + * and parallel apply workers for a subscription. */ -static int -logicalrep_pa_worker_count(Oid subid) +static void +logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply) { - int i; - int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + if (nsync != NULL) + *nsync = 0; + if (nparallelapply != NULL) + *nparallelapply = 0; + /* - * Scan all attached parallel apply workers, only counting those which - * have the given subscription id. + * Scan all attached sync and parallel apply workers, only counting those + * which have the given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) + for (int i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && isParallelApplyWorker(w)) - res++; + if (w->subid == subid) + { + if (nsync != NULL && OidIsValid(w->relid)) + (*nsync)++; + if (nparallelapply != NULL && isParallelApplyWorker(w)) + (*nparallelapply)++; + } } - - return res; } /* -- 2.49.0