diff --git a/doc/src/sgml/ref/listen.sgml b/doc/src/sgml/ref/listen.sgml
index 6c1f09bd455..fe0374a6006 100644
--- a/doc/src/sgml/ref/listen.sgml
+++ b/doc/src/sgml/ref/listen.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
-LISTEN channel
+LISTEN { channel | pattern }
@@ -30,17 +30,18 @@ LISTEN channel
LISTEN registers the current session as a
- listener on the notification channel named channel.
+ listener on the notification channels named channel or whose name match
+ the pattern.
If the current session is already registered as a listener for
- this notification channel, nothing is done.
+ these notification channels, nothing is done.
Whenever the command NOTIFY channel is invoked, either
by this session or another one connected to the same database, all
- the sessions currently listening on that notification channel are
+ the sessions currently listening on those notification channels are
notified, and each will in turn notify its connected client
application.
@@ -77,6 +78,15 @@ LISTEN channel
+
+
+ pattern
+
+
+ Pattern of notification channel names ( expression).
+
+
+
@@ -130,6 +140,17 @@ LISTEN channel
LISTEN virtual;
NOTIFY virtual;
Asynchronous notification "virtual" received from server process with PID 8448.
+
+
+ Configure and execute a listen pattern from
+ psql:
+
+
+LISTEN 'virtual%';
+NOTIFY virtual0;
+Asynchronous notification "virtual0" received from server process with PID 8448.
+NOTIFY virtual1;
+Asynchronous notification "virtual1" received from server process with PID 8448.
diff --git a/doc/src/sgml/ref/unlisten.sgml b/doc/src/sgml/ref/unlisten.sgml
index 687bf485c94..332aba50fd2 100644
--- a/doc/src/sgml/ref/unlisten.sgml
+++ b/doc/src/sgml/ref/unlisten.sgml
@@ -21,7 +21,7 @@ PostgreSQL documentation
-UNLISTEN { channel | * }
+UNLISTEN { channel | pattern | * }
@@ -33,10 +33,11 @@ UNLISTEN { channel | * }
registration for NOTIFY events.
UNLISTEN cancels any existing registration of
the current PostgreSQL session as a
- listener on the notification channel named channel. The special wildcard
- * cancels all listener registrations for the
- current session.
+ listener on the notification channels named channel or whose name match
+ the pattern.
+ The special wildcard * cancels all listener
+ registrations for the current session.
@@ -60,6 +61,15 @@ UNLISTEN { channel | * }
+
+ pattern
+
+
+ Pattern of notification channel names ( expression).
+
+
+
+
*
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..5488e50ddf2 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -133,6 +133,7 @@
#include "access/slru.h"
#include "access/transam.h"
#include "access/xact.h"
+#include "catalog/pg_collation.h"
#include "catalog/pg_database.h"
#include "commands/async.h"
#include "common/hashfn.h"
@@ -312,6 +313,12 @@ static SlruCtlData NotifyCtlData;
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
+typedef struct
+{
+ bool ispatt;
+ char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
+} ListenChannel;
+
/*
* listenChannels identifies the channels we are actually listening to
* (ie, have committed a LISTEN on). It is a simple list of channel names,
@@ -339,6 +346,7 @@ typedef enum
typedef struct
{
ListenActionKind action;
+ bool ispatt;
char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */
} ListenAction;
@@ -430,13 +438,13 @@ int max_notify_queue_pages = 1048576;
/* local function prototypes */
static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
-static void queue_listen(ListenActionKind action, const char *channel);
+static void queue_listen(ListenActionKind action, const bool ispatt, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_ListenPreCommit(void);
-static void Exec_ListenCommit(const char *channel);
-static void Exec_UnlistenCommit(const char *channel);
+static void Exec_ListenCommit(const bool ispatt, const char *channel);
+static void Exec_UnlistenCommit(const bool ispatt, const char *channel);
static void Exec_UnlistenAllCommit(void);
-static bool IsListeningOn(const char *channel);
+static bool IsListeningOn(const bool trymatch, const bool ispatt, const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
@@ -687,7 +695,7 @@ Async_Notify(const char *channel, const char *payload)
* commit.
*/
static void
-queue_listen(ListenActionKind action, const char *channel)
+queue_listen(ListenActionKind action, const bool ispatt, const char *channel)
{
MemoryContext oldcontext;
ListenAction *actrec;
@@ -705,6 +713,7 @@ queue_listen(ListenActionKind action, const char *channel)
actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) +
strlen(channel) + 1);
actrec->action = action;
+ actrec->ispatt = ispatt;
strcpy(actrec->channel, channel);
if (pendingActions == NULL || my_level > pendingActions->nestingLevel)
@@ -735,12 +744,12 @@ queue_listen(ListenActionKind action, const char *channel)
* This is executed by the SQL listen command.
*/
void
-Async_Listen(const char *channel)
+Async_Listen(const bool ispatt, const char *channel)
{
if (Trace_notify)
elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid);
- queue_listen(LISTEN_LISTEN, channel);
+ queue_listen(LISTEN_LISTEN, ispatt, channel);
}
/*
@@ -749,7 +758,7 @@ Async_Listen(const char *channel)
* This is executed by the SQL unlisten command.
*/
void
-Async_Unlisten(const char *channel)
+Async_Unlisten(const bool ispatt, const char *channel)
{
if (Trace_notify)
elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid);
@@ -758,7 +767,7 @@ Async_Unlisten(const char *channel)
if (pendingActions == NULL && !unlistenExitRegistered)
return;
- queue_listen(LISTEN_UNLISTEN, channel);
+ queue_listen(LISTEN_UNLISTEN, ispatt, channel);
}
/*
@@ -776,7 +785,7 @@ Async_UnlistenAll(void)
if (pendingActions == NULL && !unlistenExitRegistered)
return;
- queue_listen(LISTEN_UNLISTEN_ALL, "");
+ queue_listen(LISTEN_UNLISTEN_ALL, false, "");
}
/*
@@ -803,10 +812,31 @@ pg_listening_channels(PG_FUNCTION_ARGS)
if (funcctx->call_cntr < list_length(listenChannels))
{
- char *channel = (char *) list_nth(listenChannels,
- funcctx->call_cntr);
+ ListenChannel *chnl;
+
+ chnl = (ListenChannel *)list_nth(listenChannels, funcctx->call_cntr);
+
+ if (chnl->ispatt)
+ {
+ Size plen;
+ char *result;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+ plen = strlen(chnl->channel);
+ result = (char *)palloc(plen + 3);
+ result[0] = '\'';
+ memcpy(result + 1, chnl->channel, plen);
+ result[plen + 1] = '\'';
+ result[plen + 2] = '\0';
- SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+ MemoryContextSwitchTo(oldcontext);
+
+ SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(result));
+ }
+ else
+ SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(chnl->channel));
}
SRF_RETURN_DONE(funcctx);
@@ -989,10 +1019,10 @@ AtCommit_Notify(void)
switch (actrec->action)
{
case LISTEN_LISTEN:
- Exec_ListenCommit(actrec->channel);
+ Exec_ListenCommit(actrec->ispatt, actrec->channel);
break;
case LISTEN_UNLISTEN:
- Exec_UnlistenCommit(actrec->channel);
+ Exec_UnlistenCommit(actrec->ispatt, actrec->channel);
break;
case LISTEN_UNLISTEN_ALL:
Exec_UnlistenAllCommit();
@@ -1133,12 +1163,13 @@ Exec_ListenPreCommit(void)
* Add the channel to the list of channels we are listening on.
*/
static void
-Exec_ListenCommit(const char *channel)
+Exec_ListenCommit(const bool ispatt, const char *channel)
{
- MemoryContext oldcontext;
+ MemoryContext oldcontext;
+ ListenChannel *chnl;
/* Do nothing if we are already listening on this channel */
- if (IsListeningOn(channel))
+ if (IsListeningOn(false, ispatt, channel))
return;
/*
@@ -1150,7 +1181,15 @@ Exec_ListenCommit(const char *channel)
* later.
*/
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- listenChannels = lappend(listenChannels, pstrdup(channel));
+
+ chnl = (ListenChannel *) palloc(offsetof(ListenChannel, channel) +
+ strlen(channel) + 1);
+
+ chnl->ispatt = ispatt;
+ strcpy(chnl->channel, channel);
+
+ listenChannels = lappend(listenChannels, chnl);
+
MemoryContextSwitchTo(oldcontext);
}
@@ -1160,7 +1199,7 @@ Exec_ListenCommit(const char *channel)
* Remove the specified channel name from listenChannels.
*/
static void
-Exec_UnlistenCommit(const char *channel)
+Exec_UnlistenCommit(const bool ispatt, const char *channel)
{
ListCell *q;
@@ -1169,9 +1208,12 @@ Exec_UnlistenCommit(const char *channel)
foreach(q, listenChannels)
{
- char *lchan = (char *) lfirst(q);
+ ListenChannel *lchan = (ListenChannel *) lfirst(q);
+
+ if (lchan->ispatt != ispatt)
+ continue;
- if (strcmp(lchan, channel) == 0)
+ if (strcmp(lchan->channel, channel) == 0)
{
listenChannels = foreach_delete_current(listenChannels, q);
pfree(lchan);
@@ -1209,16 +1251,37 @@ Exec_UnlistenAllCommit(void)
* fairly short, though.
*/
static bool
-IsListeningOn(const char *channel)
+IsListeningOn(const bool trymatch, const bool ispatt, const char *channel)
{
ListCell *p;
foreach(p, listenChannels)
{
- char *lchan = (char *) lfirst(p);
+ ListenChannel *lchan = (ListenChannel *) lfirst(p);
- if (strcmp(lchan, channel) == 0)
- return true;
+ if (trymatch)
+ {
+ Assert(!ispatt);
+
+ if (lchan->ispatt)
+ {
+ Datum s = PointerGetDatum(cstring_to_text(channel));
+ Datum p = PointerGetDatum(cstring_to_text(lchan->channel));
+
+ if (DatumGetBool(DirectFunctionCall2Coll(textlike, DEFAULT_COLLATION_OID, s, p)))
+ return true;
+ }
+ else if (strcmp(lchan->channel, channel) == 0)
+ return true;
+ }
+ else
+ {
+ if (ispatt == lchan->ispatt)
+ {
+ if (strcmp(lchan->channel, channel) == 0)
+ return true;
+ }
+ }
}
return false;
}
@@ -2071,7 +2134,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
/* qe->data is the null-terminated channel name */
char *channel = qe->data;
- if (IsListeningOn(channel))
+ if (IsListeningOn(true, false, channel))
{
/* payload follows channel name */
char *payload = qe->data + strlen(channel) + 1;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 7d99c9355c6..e4031b4a038 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -11034,6 +11034,14 @@ ListenStmt: LISTEN ColId
{
ListenStmt *n = makeNode(ListenStmt);
+ n->conditionname = $2;
+ $$ = (Node *) n;
+ }
+ | LISTEN Sconst
+ {
+ ListenStmt *n = makeNode(ListenStmt);
+
+ n->ispatt = true;
n->conditionname = $2;
$$ = (Node *) n;
}
@@ -11054,6 +11062,14 @@ UnlistenStmt:
n->conditionname = NULL;
$$ = (Node *) n;
}
+ | UNLISTEN Sconst
+ {
+ UnlistenStmt *n = makeNode(UnlistenStmt);
+
+ n->ispatt = true;
+ n->conditionname = $2;
+ $$ = (Node *) n;
+ }
;
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 25fe3d58016..993cc152909 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -824,7 +824,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
errmsg("cannot execute %s within a background process",
"LISTEN")));
- Async_Listen(stmt->conditionname);
+ Async_Listen(stmt->ispatt, stmt->conditionname);
}
break;
@@ -834,7 +834,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
CheckRestrictedOperation("UNLISTEN");
if (stmt->conditionname)
- Async_Unlisten(stmt->conditionname);
+ Async_Unlisten(stmt->ispatt, stmt->conditionname);
else
Async_UnlistenAll();
}
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..4cb7ca38a5e 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -28,8 +28,8 @@ extern void NotifyMyFrontEnd(const char *channel,
/* notify-related SQL statements */
extern void Async_Notify(const char *channel, const char *payload);
-extern void Async_Listen(const char *channel);
-extern void Async_Unlisten(const char *channel);
+extern void Async_Listen(const bool ispatt, const char *channel);
+extern void Async_Unlisten(const bool ispatt, const char *channel);
extern void Async_UnlistenAll(void);
/* perform (or cancel) outbound notify processing at transaction commit */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 0b208f51bdd..878049ec3b5 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3720,6 +3720,7 @@ typedef struct NotifyStmt
typedef struct ListenStmt
{
NodeTag type;
+ bool ispatt; /* condition name is a pattern */
char *conditionname; /* condition name to listen on */
} ListenStmt;
@@ -3730,6 +3731,7 @@ typedef struct ListenStmt
typedef struct UnlistenStmt
{
NodeTag type;
+ bool ispatt; /* condition name is a pattern */
char *conditionname; /* name to unlisten on, or NULL for all */
} UnlistenStmt;
diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 556e1805893..f7cbfa26128 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 3 sessions
+Parsed test spec with 4 sessions
starting permutation: listenc notify1 notify2 notify3 notifyf
step listenc: LISTEN c1; LISTEN c2;
@@ -104,6 +104,16 @@ step l2commit: COMMIT;
listener2: NOTIFY "c1" with payload "" from notifier
step l2stop: UNLISTEN *;
+starting permutation: l3listen l3begin notify1 notify2 l3commit l3stop
+step l3listen: LISTEN 'c_';
+step l3begin: BEGIN;
+step notify1: NOTIFY c1;
+step notify2: NOTIFY c2, 'payload';
+step l3commit: COMMIT;
+listener3: NOTIFY "c1" with payload "" from notifier
+listener3: NOTIFY "c2" with payload "payload" from notifier
+step l3stop: UNLISTEN *;
+
starting permutation: llisten lbegin usage bignotify usage
step llisten: LISTEN c1; LISTEN c2;
step lbegin: BEGIN;
diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 0b8cfd91083..26113b5fe6e 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -53,6 +53,11 @@ step l2begin { BEGIN; }
step l2commit { COMMIT; }
step l2stop { UNLISTEN *; }
+session listener3
+step l3listen { LISTEN 'c_'; }
+step l3begin { BEGIN; }
+step l3commit { COMMIT; }
+step l3stop { UNLISTEN *; }
# Trivial cases.
permutation listenc notify1 notify2 notify3 notifyf
@@ -72,6 +77,7 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck
# Check for bug when initial listen is only action in a serializable xact,
# and notify queue is not empty
permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
+permutation l3listen l3begin notify1 notify2 l3commit l3stop
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
# after submitting notifications while another connection is listening for
diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out
index 19cbe38e636..40d51399c9b 100644
--- a/src/test/regress/expected/async.out
+++ b/src/test/regress/expected/async.out
@@ -31,6 +31,22 @@ ERROR: channel name too long
NOTIFY notify_async2;
LISTEN notify_async2;
UNLISTEN notify_async2;
+UNLISTEN *;
+NOTIFY notify_async100;
+NOTIFY notify_async200;
+LISTEN 'notify_async%';
+SELECT pg_listening_channels();
+ pg_listening_channels
+-----------------------
+ 'notify_async%'
+(1 row)
+
+UNLISTEN 'notify_async%';
+SELECT pg_listening_channels();
+ pg_listening_channels
+-----------------------
+(0 rows)
+
UNLISTEN *;
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql
index 40f6e015387..8f2bd716bda 100644
--- a/src/test/regress/sql/async.sql
+++ b/src/test/regress/sql/async.sql
@@ -18,6 +18,14 @@ LISTEN notify_async2;
UNLISTEN notify_async2;
UNLISTEN *;
+NOTIFY notify_async100;
+NOTIFY notify_async200;
+LISTEN 'notify_async%';
+SELECT pg_listening_channels();
+UNLISTEN 'notify_async%';
+SELECT pg_listening_channels();
+UNLISTEN *;
+
-- Should return zero while there are no pending notifications.
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
SELECT pg_notification_queue_usage();