diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index ab5ef25..0bb93b4 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -810,7 +810,22 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) case 'A': /* NotifyResponse */ { /* Propagate NotifyResponse. */ - pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); + int save_client_encoding; + int32 pid; + const char *channel; + const char *payload; + + save_client_encoding = pg_get_client_encoding(); + SetClientEncoding(GetDatabaseEncoding()); + + pid = pq_getmsgint(msg, 4); + channel = pq_getmsgstring(msg); + payload = pq_getmsgstring(msg); + pq_endmessage(msg); + + SetClientEncoding(save_client_encoding); + + NotifyMyFrontEnd(channel, payload, pid); break; } diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index c39ac3a..716f1c3 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -390,9 +390,6 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, char *page_buffer); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(void); -static void NotifyMyFrontEnd(const char *channel, - const char *payload, - int32 srcPid); static bool AsyncExistsPendingNotify(const char *channel, const char *payload); static void ClearPendingActionsAndNotifies(void); @@ -2076,7 +2073,7 @@ ProcessIncomingNotify(void) /* * Send NOTIFY message to my front end. */ -static void +void NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) { if (whereToSendOutput == DestRemote) diff --git a/src/include/commands/async.h b/src/include/commands/async.h index b4c13fa..95559df 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -28,6 +28,10 @@ extern volatile sig_atomic_t notifyInterruptPending; extern Size AsyncShmemSize(void); extern void AsyncShmemInit(void); +extern void NotifyMyFrontEnd(const char *channel, + const char *payload, + int32 srcPid); + /* notify-related SQL statements */ extern void Async_Notify(const char *channel, const char *payload); extern void Async_Listen(const char *channel);