From 619b10ec409185995a4a3ffd56972f1efa493c45 Mon Sep 17 00:00:00 2001 From: Dmitrii Dolgov <9erthalion6@gmail.com> Date: Fri, 4 Apr 2025 21:46:14 +0200 Subject: [PATCH v4 5/8] Introduce pss_barrierReceivedGeneration Currently WaitForProcSignalBarrier allows to make sure the message sent via EmitProcSignalBarrier was processed by all ProcSignal mechanism participants. Add pss_barrierReceivedGeneration alongside with pss_barrierGeneration, which will be updated when a process has received the message, but not processed it yet. This makes it possible to support a new mode of waiting, when ProcSignal participants want to synchronize message processing. To do that, a participant can wait via WaitForProcSignalBarrierReceived when processing a message, effectively making sure that all processes are going to start processing ProcSignalBarrier simultaneously. --- src/backend/storage/ipc/procsignal.c | 67 ++++++++++++++++++++++------ src/include/storage/procsignal.h | 1 + 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index b7c39a4c5f0..8e313ad9bf8 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -58,7 +58,10 @@ * of it. For such use cases, we set a bit in pss_barrierCheckMask and then * increment the current "barrier generation"; when the new barrier generation * (or greater) appears in the pss_barrierGeneration flag of every process, - * we know that the message has been received everywhere. + * we know that the message has been received and processed everywhere. In case + * if we only need to know only that the message was received everywhere (e.g. + * receiving processes need to handle the message in a coordinated fashion) + * use pss_barrierReceivedGeneration in the same way. */ typedef struct { @@ -70,6 +73,7 @@ typedef struct /* Barrier-related fields (not protected by pss_mutex) */ pg_atomic_uint64 pss_barrierGeneration; + pg_atomic_uint64 pss_barrierReceivedGeneration; pg_atomic_uint32 pss_barrierCheckMask; ConditionVariable pss_barrierCV; } ProcSignalSlot; @@ -151,6 +155,8 @@ ProcSignalShmemInit(void) slot->pss_cancel_key_len = 0; MemSet(slot->pss_signalFlags, 0, sizeof(slot->pss_signalFlags)); pg_atomic_init_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX); + pg_atomic_init_u64(&slot->pss_barrierReceivedGeneration, + PG_UINT64_MAX); pg_atomic_init_u32(&slot->pss_barrierCheckMask, 0); ConditionVariableInit(&slot->pss_barrierCV); } @@ -198,6 +204,8 @@ ProcSignalInit(char *cancel_key, int cancel_key_len) barrier_generation = pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration); pg_atomic_write_u64(&slot->pss_barrierGeneration, barrier_generation); + pg_atomic_write_u64(&slot->pss_barrierReceivedGeneration, + barrier_generation); if (cancel_key_len > 0) memcpy(slot->pss_cancel_key, cancel_key, cancel_key_len); @@ -262,6 +270,7 @@ CleanupProcSignalState(int status, Datum arg) * no barrier waits block on it. */ pg_atomic_write_u64(&slot->pss_barrierGeneration, PG_UINT64_MAX); + pg_atomic_write_u64(&slot->pss_barrierReceivedGeneration, PG_UINT64_MAX); SpinLockRelease(&slot->pss_mutex); @@ -415,12 +424,8 @@ EmitProcSignalBarrier(ProcSignalBarrierType type) return generation; } -/* - * WaitForProcSignalBarrier - wait until it is guaranteed that all changes - * requested by a specific call to EmitProcSignalBarrier() have taken effect. - */ -void -WaitForProcSignalBarrier(uint64 generation) +static void +WaitForProcSignalBarrierInternal(uint64 generation, bool receivedOnly) { Assert(generation <= pg_atomic_read_u64(&ProcSignal->psh_barrierGeneration)); @@ -435,12 +440,17 @@ WaitForProcSignalBarrier(uint64 generation) uint64 oldval; /* - * It's important that we check only pss_barrierGeneration here and - * not pss_barrierCheckMask. Bits in pss_barrierCheckMask get cleared - * before the barrier is actually absorbed, but pss_barrierGeneration + * It's important that we check only pss_barrierGeneration & + * pss_barrierGeneration here and not pss_barrierCheckMask. Bits in + * pss_barrierCheckMask get cleared before the barrier is actually + * absorbed, but pss_barrierGeneration & pss_barrierReceivedGeneration * is updated only afterward. */ - oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration); + if (receivedOnly) + oldval = pg_atomic_read_u64(&slot->pss_barrierReceivedGeneration); + else + oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration); + while (oldval < generation) { if (ConditionVariableTimedSleep(&slot->pss_barrierCV, @@ -449,7 +459,11 @@ WaitForProcSignalBarrier(uint64 generation) ereport(LOG, (errmsg("still waiting for backend with PID %d to accept ProcSignalBarrier", (int) pg_atomic_read_u32(&slot->pss_pid)))); - oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration); + + if (receivedOnly) + oldval = pg_atomic_read_u64(&slot->pss_barrierReceivedGeneration); + else + oldval = pg_atomic_read_u64(&slot->pss_barrierGeneration); } ConditionVariableCancelSleep(); } @@ -463,12 +477,33 @@ WaitForProcSignalBarrier(uint64 generation) * The caller is probably calling this function because it wants to read * the shared state or perform further writes to shared state once all * backends are known to have absorbed the barrier. However, the read of - * pss_barrierGeneration was performed unlocked; insert a memory barrier - * to separate it from whatever follows. + * pss_barrierGeneration & pss_barrierReceivedGeneration was performed + * unlocked; insert a memory barrier to separate it from whatever follows. */ pg_memory_barrier(); } +/* + * WaitForProcSignalBarrier - wait until it is guaranteed that all changes + * requested by a specific call to EmitProcSignalBarrier() have taken effect. + */ +void +WaitForProcSignalBarrier(uint64 generation) +{ + WaitForProcSignalBarrierInternal(generation, false); +} + +/* + * WaitForProcSignalBarrierReceived - wait until it is guaranteed that all + * backends have observed the message sent by a specific call to + * EmitProcSignalBarrier(). + */ +void +WaitForProcSignalBarrierReceived(uint64 generation) +{ + WaitForProcSignalBarrierInternal(generation, true); +} + /* * Handle receipt of an interrupt indicating a global barrier event. * @@ -522,6 +557,10 @@ ProcessProcSignalBarrier(void) if (local_gen == shared_gen) return; + /* The message is observed, record that */ + pg_atomic_write_u64(&MyProcSignalSlot->pss_barrierReceivedGeneration, + shared_gen); + /* * Get and clear the flags that are set for this backend. Note that * pg_atomic_exchange_u32 is a full barrier, so we're guaranteed that the diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 016dfd9b3f6..defd8b66a19 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -79,6 +79,7 @@ extern void SendCancelRequest(int backendPID, char *cancel_key, int cancel_key_l extern uint64 EmitProcSignalBarrier(ProcSignalBarrierType type); extern void WaitForProcSignalBarrier(uint64 generation); +extern void WaitForProcSignalBarrierReceived(uint64 generation); extern void ProcessProcSignalBarrier(void); extern void procsignal_sigusr1_handler(SIGNAL_ARGS); -- 2.45.1