diff --git a/doc/src/sgml/ref/listen.sgml b/doc/src/sgml/ref/listen.sgml index 6c1f09bd455..fe0374a6006 100644 --- a/doc/src/sgml/ref/listen.sgml +++ b/doc/src/sgml/ref/listen.sgml @@ -21,7 +21,7 @@ PostgreSQL documentation -LISTEN channel +LISTEN { channel | pattern } @@ -30,17 +30,18 @@ LISTEN channel LISTEN registers the current session as a - listener on the notification channel named channel. + listener on the notification channels named channel or whose name match + the pattern. If the current session is already registered as a listener for - this notification channel, nothing is done. + these notification channels, nothing is done. Whenever the command NOTIFY channel is invoked, either by this session or another one connected to the same database, all - the sessions currently listening on that notification channel are + the sessions currently listening on those notification channels are notified, and each will in turn notify its connected client application. @@ -77,6 +78,15 @@ LISTEN channel + + + pattern + + + Pattern of notification channel names ( expression). + + + @@ -130,6 +140,17 @@ LISTEN channel LISTEN virtual; NOTIFY virtual; Asynchronous notification "virtual" received from server process with PID 8448. + + + Configure and execute a listen pattern from + psql: + + +LISTEN 'virtual%'; +NOTIFY virtual0; +Asynchronous notification "virtual0" received from server process with PID 8448. +NOTIFY virtual1; +Asynchronous notification "virtual1" received from server process with PID 8448. diff --git a/doc/src/sgml/ref/unlisten.sgml b/doc/src/sgml/ref/unlisten.sgml index 687bf485c94..332aba50fd2 100644 --- a/doc/src/sgml/ref/unlisten.sgml +++ b/doc/src/sgml/ref/unlisten.sgml @@ -21,7 +21,7 @@ PostgreSQL documentation -UNLISTEN { channel | * } +UNLISTEN { channel | pattern | * } @@ -33,10 +33,11 @@ UNLISTEN { channel | * } registration for NOTIFY events. UNLISTEN cancels any existing registration of the current PostgreSQL session as a - listener on the notification channel named channel. The special wildcard - * cancels all listener registrations for the - current session. + listener on the notification channels named channel or whose name match + the pattern. + The special wildcard * cancels all listener + registrations for the current session. @@ -60,6 +61,15 @@ UNLISTEN { channel | * } + + pattern + + + Pattern of notification channel names ( expression). + + + + * diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..5488e50ddf2 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -133,6 +133,7 @@ #include "access/slru.h" #include "access/transam.h" #include "access/xact.h" +#include "catalog/pg_collation.h" #include "catalog/pg_database.h" #include "commands/async.h" #include "common/hashfn.h" @@ -312,6 +313,12 @@ static SlruCtlData NotifyCtlData; #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ +typedef struct +{ + bool ispatt; + char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ +} ListenChannel; + /* * listenChannels identifies the channels we are actually listening to * (ie, have committed a LISTEN on). It is a simple list of channel names, @@ -339,6 +346,7 @@ typedef enum typedef struct { ListenActionKind action; + bool ispatt; char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ } ListenAction; @@ -430,13 +438,13 @@ int max_notify_queue_pages = 1048576; /* local function prototypes */ static inline int64 asyncQueuePageDiff(int64 p, int64 q); static inline bool asyncQueuePagePrecedes(int64 p, int64 q); -static void queue_listen(ListenActionKind action, const char *channel); +static void queue_listen(ListenActionKind action, const bool ispatt, const char *channel); static void Async_UnlistenOnExit(int code, Datum arg); static void Exec_ListenPreCommit(void); -static void Exec_ListenCommit(const char *channel); -static void Exec_UnlistenCommit(const char *channel); +static void Exec_ListenCommit(const bool ispatt, const char *channel); +static void Exec_UnlistenCommit(const bool ispatt, const char *channel); static void Exec_UnlistenAllCommit(void); -static bool IsListeningOn(const char *channel); +static bool IsListeningOn(const bool trymatch, const bool ispatt, const char *channel); static void asyncQueueUnregister(void); static bool asyncQueueIsFull(void); static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); @@ -687,7 +695,7 @@ Async_Notify(const char *channel, const char *payload) * commit. */ static void -queue_listen(ListenActionKind action, const char *channel) +queue_listen(ListenActionKind action, const bool ispatt, const char *channel) { MemoryContext oldcontext; ListenAction *actrec; @@ -705,6 +713,7 @@ queue_listen(ListenActionKind action, const char *channel) actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) + strlen(channel) + 1); actrec->action = action; + actrec->ispatt = ispatt; strcpy(actrec->channel, channel); if (pendingActions == NULL || my_level > pendingActions->nestingLevel) @@ -735,12 +744,12 @@ queue_listen(ListenActionKind action, const char *channel) * This is executed by the SQL listen command. */ void -Async_Listen(const char *channel) +Async_Listen(const bool ispatt, const char *channel) { if (Trace_notify) elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); - queue_listen(LISTEN_LISTEN, channel); + queue_listen(LISTEN_LISTEN, ispatt, channel); } /* @@ -749,7 +758,7 @@ Async_Listen(const char *channel) * This is executed by the SQL unlisten command. */ void -Async_Unlisten(const char *channel) +Async_Unlisten(const bool ispatt, const char *channel) { if (Trace_notify) elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); @@ -758,7 +767,7 @@ Async_Unlisten(const char *channel) if (pendingActions == NULL && !unlistenExitRegistered) return; - queue_listen(LISTEN_UNLISTEN, channel); + queue_listen(LISTEN_UNLISTEN, ispatt, channel); } /* @@ -776,7 +785,7 @@ Async_UnlistenAll(void) if (pendingActions == NULL && !unlistenExitRegistered) return; - queue_listen(LISTEN_UNLISTEN_ALL, ""); + queue_listen(LISTEN_UNLISTEN_ALL, false, ""); } /* @@ -803,10 +812,31 @@ pg_listening_channels(PG_FUNCTION_ARGS) if (funcctx->call_cntr < list_length(listenChannels)) { - char *channel = (char *) list_nth(listenChannels, - funcctx->call_cntr); + ListenChannel *chnl; + + chnl = (ListenChannel *)list_nth(listenChannels, funcctx->call_cntr); + + if (chnl->ispatt) + { + Size plen; + char *result; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + plen = strlen(chnl->channel); + result = (char *)palloc(plen + 3); + result[0] = '\''; + memcpy(result + 1, chnl->channel, plen); + result[plen + 1] = '\''; + result[plen + 2] = '\0'; - SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); + MemoryContextSwitchTo(oldcontext); + + SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(result)); + } + else + SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(chnl->channel)); } SRF_RETURN_DONE(funcctx); @@ -989,10 +1019,10 @@ AtCommit_Notify(void) switch (actrec->action) { case LISTEN_LISTEN: - Exec_ListenCommit(actrec->channel); + Exec_ListenCommit(actrec->ispatt, actrec->channel); break; case LISTEN_UNLISTEN: - Exec_UnlistenCommit(actrec->channel); + Exec_UnlistenCommit(actrec->ispatt, actrec->channel); break; case LISTEN_UNLISTEN_ALL: Exec_UnlistenAllCommit(); @@ -1133,12 +1163,13 @@ Exec_ListenPreCommit(void) * Add the channel to the list of channels we are listening on. */ static void -Exec_ListenCommit(const char *channel) +Exec_ListenCommit(const bool ispatt, const char *channel) { - MemoryContext oldcontext; + MemoryContext oldcontext; + ListenChannel *chnl; /* Do nothing if we are already listening on this channel */ - if (IsListeningOn(channel)) + if (IsListeningOn(false, ispatt, channel)) return; /* @@ -1150,7 +1181,15 @@ Exec_ListenCommit(const char *channel) * later. */ oldcontext = MemoryContextSwitchTo(TopMemoryContext); - listenChannels = lappend(listenChannels, pstrdup(channel)); + + chnl = (ListenChannel *) palloc(offsetof(ListenChannel, channel) + + strlen(channel) + 1); + + chnl->ispatt = ispatt; + strcpy(chnl->channel, channel); + + listenChannels = lappend(listenChannels, chnl); + MemoryContextSwitchTo(oldcontext); } @@ -1160,7 +1199,7 @@ Exec_ListenCommit(const char *channel) * Remove the specified channel name from listenChannels. */ static void -Exec_UnlistenCommit(const char *channel) +Exec_UnlistenCommit(const bool ispatt, const char *channel) { ListCell *q; @@ -1169,9 +1208,12 @@ Exec_UnlistenCommit(const char *channel) foreach(q, listenChannels) { - char *lchan = (char *) lfirst(q); + ListenChannel *lchan = (ListenChannel *) lfirst(q); + + if (lchan->ispatt != ispatt) + continue; - if (strcmp(lchan, channel) == 0) + if (strcmp(lchan->channel, channel) == 0) { listenChannels = foreach_delete_current(listenChannels, q); pfree(lchan); @@ -1209,16 +1251,37 @@ Exec_UnlistenAllCommit(void) * fairly short, though. */ static bool -IsListeningOn(const char *channel) +IsListeningOn(const bool trymatch, const bool ispatt, const char *channel) { ListCell *p; foreach(p, listenChannels) { - char *lchan = (char *) lfirst(p); + ListenChannel *lchan = (ListenChannel *) lfirst(p); - if (strcmp(lchan, channel) == 0) - return true; + if (trymatch) + { + Assert(!ispatt); + + if (lchan->ispatt) + { + Datum s = PointerGetDatum(cstring_to_text(channel)); + Datum p = PointerGetDatum(cstring_to_text(lchan->channel)); + + if (DatumGetBool(DirectFunctionCall2Coll(textlike, DEFAULT_COLLATION_OID, s, p))) + return true; + } + else if (strcmp(lchan->channel, channel) == 0) + return true; + } + else + { + if (ispatt == lchan->ispatt) + { + if (strcmp(lchan->channel, channel) == 0) + return true; + } + } } return false; } @@ -2071,7 +2134,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, /* qe->data is the null-terminated channel name */ char *channel = qe->data; - if (IsListeningOn(channel)) + if (IsListeningOn(true, false, channel)) { /* payload follows channel name */ char *payload = qe->data + strlen(channel) + 1; diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 7d99c9355c6..e4031b4a038 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -11034,6 +11034,14 @@ ListenStmt: LISTEN ColId { ListenStmt *n = makeNode(ListenStmt); + n->conditionname = $2; + $$ = (Node *) n; + } + | LISTEN Sconst + { + ListenStmt *n = makeNode(ListenStmt); + + n->ispatt = true; n->conditionname = $2; $$ = (Node *) n; } @@ -11054,6 +11062,14 @@ UnlistenStmt: n->conditionname = NULL; $$ = (Node *) n; } + | UNLISTEN Sconst + { + UnlistenStmt *n = makeNode(UnlistenStmt); + + n->ispatt = true; + n->conditionname = $2; + $$ = (Node *) n; + } ; diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 25fe3d58016..993cc152909 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -824,7 +824,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, errmsg("cannot execute %s within a background process", "LISTEN"))); - Async_Listen(stmt->conditionname); + Async_Listen(stmt->ispatt, stmt->conditionname); } break; @@ -834,7 +834,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, CheckRestrictedOperation("UNLISTEN"); if (stmt->conditionname) - Async_Unlisten(stmt->conditionname); + Async_Unlisten(stmt->ispatt, stmt->conditionname); else Async_UnlistenAll(); } diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..4cb7ca38a5e 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -28,8 +28,8 @@ extern void NotifyMyFrontEnd(const char *channel, /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); -extern void Async_Listen(const char *channel); -extern void Async_Unlisten(const char *channel); +extern void Async_Listen(const bool ispatt, const char *channel); +extern void Async_Unlisten(const bool ispatt, const char *channel); extern void Async_UnlistenAll(void); /* perform (or cancel) outbound notify processing at transaction commit */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 0b208f51bdd..878049ec3b5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3720,6 +3720,7 @@ typedef struct NotifyStmt typedef struct ListenStmt { NodeTag type; + bool ispatt; /* condition name is a pattern */ char *conditionname; /* condition name to listen on */ } ListenStmt; @@ -3730,6 +3731,7 @@ typedef struct ListenStmt typedef struct UnlistenStmt { NodeTag type; + bool ispatt; /* condition name is a pattern */ char *conditionname; /* name to unlisten on, or NULL for all */ } UnlistenStmt; diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out index 556e1805893..f7cbfa26128 100644 --- a/src/test/isolation/expected/async-notify.out +++ b/src/test/isolation/expected/async-notify.out @@ -1,4 +1,4 @@ -Parsed test spec with 3 sessions +Parsed test spec with 4 sessions starting permutation: listenc notify1 notify2 notify3 notifyf step listenc: LISTEN c1; LISTEN c2; @@ -104,6 +104,16 @@ step l2commit: COMMIT; listener2: NOTIFY "c1" with payload "" from notifier step l2stop: UNLISTEN *; +starting permutation: l3listen l3begin notify1 notify2 l3commit l3stop +step l3listen: LISTEN 'c_'; +step l3begin: BEGIN; +step notify1: NOTIFY c1; +step notify2: NOTIFY c2, 'payload'; +step l3commit: COMMIT; +listener3: NOTIFY "c1" with payload "" from notifier +listener3: NOTIFY "c2" with payload "payload" from notifier +step l3stop: UNLISTEN *; + starting permutation: llisten lbegin usage bignotify usage step llisten: LISTEN c1; LISTEN c2; step lbegin: BEGIN; diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec index 0b8cfd91083..26113b5fe6e 100644 --- a/src/test/isolation/specs/async-notify.spec +++ b/src/test/isolation/specs/async-notify.spec @@ -53,6 +53,11 @@ step l2begin { BEGIN; } step l2commit { COMMIT; } step l2stop { UNLISTEN *; } +session listener3 +step l3listen { LISTEN 'c_'; } +step l3begin { BEGIN; } +step l3commit { COMMIT; } +step l3stop { UNLISTEN *; } # Trivial cases. permutation listenc notify1 notify2 notify3 notifyf @@ -72,6 +77,7 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck # Check for bug when initial listen is only action in a serializable xact, # and notify queue is not empty permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop +permutation l3listen l3begin notify1 notify2 l3commit l3stop # Verify that pg_notification_queue_usage correctly reports a non-zero result, # after submitting notifications while another connection is listening for diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out index 19cbe38e636..40d51399c9b 100644 --- a/src/test/regress/expected/async.out +++ b/src/test/regress/expected/async.out @@ -31,6 +31,22 @@ ERROR: channel name too long NOTIFY notify_async2; LISTEN notify_async2; UNLISTEN notify_async2; +UNLISTEN *; +NOTIFY notify_async100; +NOTIFY notify_async200; +LISTEN 'notify_async%'; +SELECT pg_listening_channels(); + pg_listening_channels +----------------------- + 'notify_async%' +(1 row) + +UNLISTEN 'notify_async%'; +SELECT pg_listening_channels(); + pg_listening_channels +----------------------- +(0 rows) + UNLISTEN *; -- Should return zero while there are no pending notifications. -- src/test/isolation/specs/async-notify.spec tests for actual usage. diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql index 40f6e015387..8f2bd716bda 100644 --- a/src/test/regress/sql/async.sql +++ b/src/test/regress/sql/async.sql @@ -18,6 +18,14 @@ LISTEN notify_async2; UNLISTEN notify_async2; UNLISTEN *; +NOTIFY notify_async100; +NOTIFY notify_async200; +LISTEN 'notify_async%'; +SELECT pg_listening_channels(); +UNLISTEN 'notify_async%'; +SELECT pg_listening_channels(); +UNLISTEN *; + -- Should return zero while there are no pending notifications. -- src/test/isolation/specs/async-notify.spec tests for actual usage. SELECT pg_notification_queue_usage();