From 75302cba302b83ce2a6d6eaf30b163f473b87276 Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Wed, 21 Feb 2024 16:36:25 +0900
Subject: [PATCH v2 1/2] injection_points: Add routines to wait and wake
 processes

This commit is made of two parts:
- A new callback that can be attached to a process to make it wait on a
condition variable.  The condition checked is registered in shared
memory by the module injection_points.
- A new SQL function to update the shared state and broadcast the update
using a condition variable.

The shared state used by the module is registered using the DSM
registry, and is optional.
---
 .../injection_points--1.0.sql                 |  10 ++
 .../injection_points/injection_points.c       | 151 ++++++++++++++++++
 src/tools/pgindent/typedefs.list              |   1 +
 3 files changed, 162 insertions(+)

diff --git a/src/test/modules/injection_points/injection_points--1.0.sql b/src/test/modules/injection_points/injection_points--1.0.sql
index 5944c41716..eed0310cf6 100644
--- a/src/test/modules/injection_points/injection_points--1.0.sql
+++ b/src/test/modules/injection_points/injection_points--1.0.sql
@@ -24,6 +24,16 @@ RETURNS void
 AS 'MODULE_PATHNAME', 'injection_points_run'
 LANGUAGE C STRICT PARALLEL UNSAFE;
 
+--
+-- injection_points_wakeup()
+--
+-- Wakes a condition variable waited on in an injection point.
+--
+CREATE FUNCTION injection_points_wakeup(IN point_name TEXT)
+RETURNS void
+AS 'MODULE_PATHNAME', 'injection_points_wakeup'
+LANGUAGE C STRICT PARALLEL UNSAFE;
+
 --
 -- injection_points_detach()
 --
diff --git a/src/test/modules/injection_points/injection_points.c b/src/test/modules/injection_points/injection_points.c
index e843e6594f..052b20f9c8 100644
--- a/src/test/modules/injection_points/injection_points.c
+++ b/src/test/modules/injection_points/injection_points.c
@@ -18,18 +18,72 @@
 #include "postgres.h"
 
 #include "fmgr.h"
+#include "storage/condition_variable.h"
 #include "storage/lwlock.h"
 #include "storage/shmem.h"
+#include "storage/dsm_registry.h"
 #include "utils/builtins.h"
 #include "utils/injection_point.h"
 #include "utils/wait_event.h"
 
 PG_MODULE_MAGIC;
 
+/* Maximum number of wait usable in injection points at once */
+#define INJ_MAX_WAIT	32
+#define INJ_NAME_MAXLEN	64
+
+/* Shared state information for injection points. */
+typedef struct InjectionPointSharedState
+{
+	/* protects accesses to wait_counts */
+	slock_t		lock;
+
+	/* Counters advancing when injection_points_wakeup() is called */
+	int			wait_counts[INJ_MAX_WAIT];
+
+	/* Names of injection points attached to wait counters */
+	char		name[INJ_MAX_WAIT][INJ_NAME_MAXLEN];
+
+	/*
+	 * Condition variable used for waits and wakeups, checking upon the set of
+	 * wait_counts when waiting.
+	 */
+	ConditionVariable wait_point;
+} InjectionPointSharedState;
+
+/* Pointer to shared-memory state. */
+static InjectionPointSharedState *inj_state = NULL;
+
 extern PGDLLEXPORT void injection_error(const char *name);
 extern PGDLLEXPORT void injection_notice(const char *name);
+extern PGDLLEXPORT void injection_wait(const char *name);
 
 
+static void
+injection_point_init_state(void *ptr)
+{
+	InjectionPointSharedState *state = (InjectionPointSharedState *) ptr;
+
+	SpinLockInit(&state->lock);
+	memset(state->wait_counts, 0, sizeof(state->wait_counts));
+	memset(state->name, 0, sizeof(state->name));
+	ConditionVariableInit(&state->wait_point);
+}
+
+static void
+injection_init_shmem(void)
+{
+	bool		found;
+
+	if (inj_state != NULL)
+		return;
+
+	inj_state = GetNamedDSMSegment("injection_points",
+								   sizeof(InjectionPointSharedState),
+								   injection_point_init_state,
+								   &found);
+}
+
 /* Set of callbacks available to be attached to an injection point. */
 void
 injection_error(const char *name)
@@ -43,6 +97,65 @@ injection_notice(const char *name)
 	elog(NOTICE, "notice triggered for injection point %s", name);
 }
 
+/* Wait on a condition variable, awaken by injection_points_wakeup() */
+void
+injection_wait(const char *name)
+{
+	int			old_wait_counts = -1;
+	int			index = -1;
+	uint32		injection_wait_event = 0;
+
+	if (inj_state == NULL)
+		injection_init_shmem();
+
+	/*
+	 * This custom wait event name is not released, but we don't care much for
+	 * testing as this will be short-lived.
+	 */
+	injection_wait_event = WaitEventExtensionNew(name);
+
+	/*
+	 * Find a free slot to wait for, and register this injection point's name.
+	 */
+	SpinLockAcquire(&inj_state->lock);
+	for (int i = 0; i < INJ_MAX_WAIT; i++)
+	{
+		if (inj_state->name[i][0] == '\0')
+		{
+			index = i;
+			strlcpy(inj_state->name[i], name, INJ_NAME_MAXLEN);
+			old_wait_counts = inj_state->wait_counts[i];
+			break;
+		}
+	}
+	SpinLockRelease(&inj_state->lock);
+
+	if (index < 0)
+		elog(ERROR, "could not find free slot for wait of injection point %s ",
+			 name);
+
+	/* And sleep.. */
+	ConditionVariablePrepareToSleep(&inj_state->wait_point);
+	for (;;)
+	{
+		int			new_wait_counts;
+
+		SpinLockAcquire(&inj_state->lock);
+		new_wait_counts = inj_state->wait_counts[index];
+		SpinLockRelease(&inj_state->lock);
+
+		if (old_wait_counts != new_wait_counts)
+			break;
+		ConditionVariableSleep(&inj_state->wait_point, injection_wait_event);
+	}
+	ConditionVariableCancelSleep();
+
+	/* Remove us from the waiting list */
+	SpinLockAcquire(&inj_state->lock);
+	inj_state->name[index][0] = '\0';
+	SpinLockRelease(&inj_state->lock);
+}
+
 /*
  * SQL function for creating an injection point.
  */
@@ -58,6 +171,8 @@ injection_points_attach(PG_FUNCTION_ARGS)
 		function = "injection_error";
 	else if (strcmp(action, "notice") == 0)
 		function = "injection_notice";
+	else if (strcmp(action, "wait") == 0)
+		function = "injection_wait";
 	else
 		elog(ERROR, "incorrect action \"%s\" for injection point creation", action);
 
@@ -80,6 +195,42 @@ injection_points_run(PG_FUNCTION_ARGS)
 	PG_RETURN_VOID();
 }
 
+/*
+ * SQL function for waking a condition variable.
+ */
+PG_FUNCTION_INFO_V1(injection_points_wakeup);
+Datum
+injection_points_wakeup(PG_FUNCTION_ARGS)
+{
+	char	   *name = text_to_cstring(PG_GETARG_TEXT_PP(0));
+	int			index = -1;
+
+	if (inj_state == NULL)
+		injection_init_shmem();
+
+	/* First bump the wait counter for the injection point to wake */
+	SpinLockAcquire(&inj_state->lock);
+	for (int i = 0; i < INJ_MAX_WAIT; i++)
+	{
+		if (strcmp(name, inj_state->name[i]) == 0)
+		{
+			index = i;
+			break;
+		}
+	}
+	if (index < 0)
+	{
+		SpinLockRelease(&inj_state->lock);
+		elog(ERROR, "could not find injection point %s to wake", name);
+	}
+	inj_state->wait_counts[index]++;
+	SpinLockRelease(&inj_state->lock);
+
+	/* And broadcast the change for the waiters */
+	ConditionVariableBroadcast(&inj_state->wait_point);
+	PG_RETURN_VOID();
+}
+
 /*
  * SQL function for dropping an injection point.
  */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index d808aad8b0..d7eca00502 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1208,6 +1208,7 @@ InitializeDSMForeignScan_function
 InitializeWorkerForeignScan_function
 InjectionPointCacheEntry
 InjectionPointEntry
+InjectionPointSharedState
 InlineCodeBlock
 InsertStmt
 Instrumentation
-- 
2.43.0

