Re: [HACKERS] Anyone working on asynchronous NOTIFY reception?

From: Massimo Dal Zotto <dz(at)cs(dot)unitn(dot)it>
To: hackers(at)postgreSQL(dot)org (Pgsql Development)
Cc: tgl(at)sss(dot)pgh(dot)pa(dot)us (Tom Lane)
Subject: Re: [HACKERS] Anyone working on asynchronous NOTIFY reception?
Date: 1998-04-22 20:36:40
Message-ID: 199804222036.WAA00620@nikita.wizard.it
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

>
> > The biggest problem is that if you have many clients listening on the same
> > thing they are signaled at the same time and all of them try to access the
> > pg_listener table for write. The result is that you have a lot of waits on
> > the table and sometimes also deadlocks if you don't do things carefully.
>
> Right, I recall seeing some things about that in the mailing list
> archives (from you, no doubt?). I had the impression that async.c
> had been changed to handle this better as of the current release.
> Is there still a problem?
>
> (Fortunately, I don't expect a *lot* of clients waiting on the same
> table, but deadlock would still be very bad news...)
>
> > From the Tcl side, a better solution would be to define a tcl event handler,
> > like the standard Tcl filehandler, which would be invoked automatically by
> > the Tk event loop or by tkwait if using pure Tcl.
>
> I agree.
>
> I don't have an immediate need for Tcl-based clients, so I was just
> going to revise libpg and libpg++. Do you want to redo libpgtcl?
> I'd probably get to that eventually, but splitting the work sounds
> better :-).

Not now, I am too busy.

> I'll post something later today about what the extensions to the
> libpg API should look like.
>
> > I have also some new patches which try to reduce the notify overhead by
> > avoiding unnecessary unlocks of the table. If you are interested I can
> > post them.
>
> Please do.
>
> regards, tom lane

This is the patch against 6.2.1p7. I haven't the the time to port it to 6.3.1.
The idea is to notify the backends while we have a write lock on the table
before doing the first CommitTransactionCommand. Otherwise if we must also
notify our frontend we almost certainly get the lock again only after all the
other backends have processed the notify and this may take a lot of time.

Note however that there is a little problem by releasing the lock before the
end of transaction: you may get duplicate records in pg_listener if more
backends are notifying the same relation at the same time. I don't know why
this happens and hadn't time to investigate, so I wrote a quick hack in
Async_NotifyFrontEnd_Aux() to avoid the problem (search for "notifyHack").

This is what I found in my pg_listener:
mytable | 627| 0
mytable | 627| 0
mytable | 627| 0

And this is the patch for 6.2.1p7:

*** async.c.orig Tue Jan 27 17:06:42 1998
--- async.c Thu Mar 19 01:09:49 1998
***************
*** 22,30 ****
* notification (we are notifying something that we are listening),
* signal the corresponding frontend over the comm channel using the
* out-of-band channel.
! * 2.b For all other listening processes, we send kill(2) to wake up
! * the listening backend.
! * 3. Upon receiving a kill(2) signal from another backend process notifying
* that one of the relation that we are listening is being notified,
* we can be in either of two following states:
* 3.a We are sleeping, wake up and signal our frontend.
--- 22,30 ----
* notification (we are notifying something that we are listening),
* signal the corresponding frontend over the comm channel using the
* out-of-band channel.
! * 2.b For all other listening processes, we send a SIGUSR2 signal
! * to wake up the listening backend.
! * 3. Upon receiving a SIGUSR2 signal from another backend process notifying
* that one of the relation that we are listening is being notified,
* we can be in either of two following states:
* 3.a We are sleeping, wake up and signal our frontend.
***************
*** 85,99 ****
#include <port-protos.h> /* for strdup() */

#include <storage/lmgr.h>

static int notifyFrontEndPending = 0;
static int notifyIssued = 0;
static Dllist *pendingNotifies = NULL;

-
static int AsyncExistsPendingNotify(char *);
static void ClearPendingNotify(void);
static void Async_NotifyFrontEnd(void);
void Async_Unlisten(char *relname, int pid);
static void Async_UnlistenOnExit(int code, char *relname);

--- 85,105 ----
#include <port-protos.h> /* for strdup() */

#include <storage/lmgr.h>
+ #include <utils/trace.h>
+
+ #define notifyUnlock pg_options[OPT_NOTIFYUNLOCK]
+ #define notifyHack pg_options[OPT_NOTIFYHACK]
+
+ GlobalMemory notifyContext = NULL;

static int notifyFrontEndPending = 0;
static int notifyIssued = 0;
static Dllist *pendingNotifies = NULL;

static int AsyncExistsPendingNotify(char *);
static void ClearPendingNotify(void);
static void Async_NotifyFrontEnd(void);
+ static void Async_NotifyFrontEnd_Aux(void);
void Async_Unlisten(char *relname, int pid);
static void Async_UnlistenOnExit(int code, char *relname);

***************
*** 121,145 ****
{
extern TransactionState CurrentTransactionState;

if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
!
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Waking up sleeping backend process");
! #endif
Async_NotifyFrontEnd();
-
}
else
{
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Process is in the middle of another transaction, state = %d, block state = %d",
! CurrentTransactionState->state,
! CurrentTransactionState->blockState);
! #endif
notifyFrontEndPending = 1;
}
}

/*
--- 127,152 ----
{
extern TransactionState CurrentTransactionState;

+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler");
+
if ((CurrentTransactionState->state == TRANS_DEFAULT) &&
(CurrentTransactionState->blockState == TRANS_DEFAULT))
{
! TPRINTF(TRACE_NOTIFY, "Waking up sleeping backend process");
Async_NotifyFrontEnd();
}
else
{
! TPRINTF(TRACE_NOTIFY,
! "Process is in the middle of another transaction, "
! "state = %d, block state = %d",
! CurrentTransactionState->state,
! CurrentTransactionState->blockState);
notifyFrontEndPending = 1;
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler: notify frontend pending");
}
+
+ TPRINTF(TRACE_NOTIFY, "Async_NotifyHandler done");
}

/*
***************
*** 184,192 ****

char *notifyName;

! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Async_Notify: %s", relname);
! #endif

if (!pendingNotifies)
pendingNotifies = DLNewList();
--- 191,197 ----

char *notifyName;

! TPRINTF(TRACE_NOTIFY, "Async_Notify: %s", relname);

if (!pendingNotifies)
pendingNotifies = DLNewList();
***************
*** 224,234 ****
heap_replace(lRel, &lTuple->t_ctid, rTuple);
}
ReleaseBuffer(b);
}
heap_endscan(sRel);
! RelationUnsetLockForWrite(lRel);
heap_close(lRel);
! notifyIssued = 1;
}

/*
--- 229,249 ----
heap_replace(lRel, &lTuple->t_ctid, rTuple);
}
ReleaseBuffer(b);
+ notifyIssued = 1;
}
heap_endscan(sRel);
!
! /*
! * Note: if we unset the lock or we could get multiple tuples
! * with same oid if other backends notify the same relation.
! */
! if (notifyUnlock) {
! RelationUnsetLockForWrite(lRel);
! }
!
heap_close(lRel);
!
! TPRINTF(TRACE_NOTIFY, "Async_Notify: done %s", relname);
}

/*
***************
*** 278,286 ****
{ /* 'notify <relname>' issued by us */
notifyIssued = 0;
StartTransactionCommand();
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Async_NotifyAtCommit.");
! #endif
ScanKeyEntryInitialize(&key, 0,
Anum_pg_listener_notify,
Integer32EqualRegProcedure,
--- 293,299 ----
{ /* 'notify <relname>' issued by us */
notifyIssued = 0;
StartTransactionCommand();
! TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit");
ScanKeyEntryInitialize(&key, 0,
Anum_pg_listener_notify,
Integer32EqualRegProcedure,
***************
*** 303,318 ****

if (ourpid == DatumGetInt32(d))
{
- #ifdef ASYNC_DEBUG
- elog(DEBUG, "Notifying self, setting notifyFronEndPending to 1");
- #endif
notifyFrontEndPending = 1;
}
else
{
! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Notifying others");
! #endif
#ifdef HAVE_KILL
if (kill(DatumGetInt32(d), SIGUSR2) < 0)
{
--- 316,330 ----

if (ourpid == DatumGetInt32(d))
{
notifyFrontEndPending = 1;
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyAtCommit notifying self");
}
else
{
! TPRINTF(TRACE_NOTIFY,
! "Async_NotifyAtCommit notifying %d",
! DatumGetInt32(d));
#ifdef HAVE_KILL
if (kill(DatumGetInt32(d), SIGUSR2) < 0)
{
***************
*** 327,344 ****
ReleaseBuffer(b);
}
heap_endscan(sRel);
- RelationUnsetLockForWrite(lRel);
heap_close(lRel);
-
- CommitTransactionCommand();
ClearPendingNotify();
- }

! if (notifyFrontEndPending)
! { /* we need to notify the frontend of all
! * pending notifies. */
! notifyFrontEndPending = 1;
! Async_NotifyFrontEnd();
}
}
}
--- 339,361 ----
ReleaseBuffer(b);
}
heap_endscan(sRel);
heap_close(lRel);
ClearPendingNotify();

! if (notifyFrontEndPending)
! {
! /* Notify the frontend inside the current transaction! */
! Async_NotifyFrontEnd_Aux();
! }
!
! TPRINTF(TRACE_NOTIFY, "Async_NotifyAtCommit done");
! CommitTransactionCommand();
! } else {
! /* Notify the frontend of pending notifies from other backends. */
! if (notifyFrontEndPending)
! {
! Async_NotifyFrontEnd();
! }
}
}
}
***************
*** 422,430 ****
char *relnamei;
TupleDesc tupDesc;

! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Async_Listen: %s", relname);
! #endif
for (i = 0; i < Natts_pg_listener; i++)
{
nulls[i] = ' ';
--- 439,445 ----
char *relnamei;
TupleDesc tupDesc;

! TPRINTF(TRACE_NOTIFY, "Async_Listen: %s", relname);
for (i = 0; i < Natts_pg_listener; i++)
{
nulls[i] = ' ';
***************
*** 457,462 ****
--- 472,480 ----
}
}
ReleaseBuffer(b);
+ if (alreadyListener) {
+ break;
+ }
}
heap_endscan(s);

***************
*** 464,485 ****
{
elog(NOTICE, "Async_Listen: We are already listening on %s",
relname);
return;
}

tupDesc = lDesc->rd_att;
! tup = heap_formtuple(tupDesc,
! values,
! nulls);
heap_insert(lDesc, tup);
-
pfree(tup);

- /*
- * if (alreadyListener) { elog(NOTICE,"Async_Listen: already one
- * listener on %s (possibly dead)",relname); }
- */
-
RelationUnsetLockForWrite(lDesc);
heap_close(lDesc);

--- 482,497 ----
{
elog(NOTICE, "Async_Listen: We are already listening on %s",
relname);
+ RelationUnsetLockForWrite(lDesc);
+ heap_close(lDesc);
return;
}

tupDesc = lDesc->rd_att;
! tup = heap_formtuple(tupDesc, values, nulls);
heap_insert(lDesc, tup);
pfree(tup);

RelationUnsetLockForWrite(lDesc);
heap_close(lDesc);

***************
*** 519,534 ****
lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
Int32GetDatum(pid),
0, 0);
- lDesc = heap_openr(ListenerRelationName);
- RelationSetLockForWrite(lDesc);
-
if (lTuple != NULL)
{
heap_delete(lDesc, &lTuple->t_ctid);
- }

! RelationUnsetLockForWrite(lDesc);
! heap_close(lDesc);
}

static void
--- 531,545 ----
lTuple = SearchSysCacheTuple(LISTENREL, PointerGetDatum(relname),
Int32GetDatum(pid),
0, 0);
if (lTuple != NULL)
{
+ lDesc = heap_openr(ListenerRelationName);
+ RelationSetLockForWrite(lDesc);
heap_delete(lDesc, &lTuple->t_ctid);

! RelationUnsetLockForWrite(lDesc);
! heap_close(lDesc);
! }
}

static void
***************
*** 560,570 ****
*
* --------------------------------------------------------------
*/
- GlobalMemory notifyContext = NULL;
-
static void
Async_NotifyFrontEnd()
{
extern CommandDest whereToSendOutput;
HeapTuple lTuple,
rTuple;
--- 571,595 ----
*
* --------------------------------------------------------------
*/
static void
Async_NotifyFrontEnd()
{
+ StartTransactionCommand();
+ Async_NotifyFrontEnd_Aux();
+ CommitTransactionCommand();
+ }
+
+ /*
+ * --------------------------------------------------------------
+ * Async_NotifyFrontEnd_Aux --
+ *
+ * Like Async_NotifyFrontEnd but MUST be called inside a transaction.
+ *
+ * --------------------------------------------------------------
+ */
+ static void
+ Async_NotifyFrontEnd_Aux()
+ {
extern CommandDest whereToSendOutput;
HeapTuple lTuple,
rTuple;
***************
*** 580,592 ****
int ourpid;
bool isnull;

! notifyFrontEndPending = 0;

! #ifdef ASYNC_DEBUG
! elog(DEBUG, "Async_NotifyFrontEnd: notifying front end.");
! #endif

! StartTransactionCommand();
ourpid = getpid();
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_notify,
--- 605,616 ----
int ourpid;
bool isnull;

! char *hack[32];
! int i, hack_count = 0;

! notifyFrontEndPending = 0;

! TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd");
ourpid = getpid();
ScanKeyEntryInitialize(&key[0], 0,
Anum_pg_listener_notify,
***************
*** 611,620 ****
--- 635,664 ----
{
d = heap_getattr(lTuple, b, Anum_pg_listener_relname,
tdesc, &isnull);
+
+ /* Hack to delete duplicate tuples (possible if notifyUnlock is set) */
+ if (notifyHack) {
+ for (i=0; i<hack_count; i++) {
+ if (strcmp(DatumGetName(d)->data, hack[i]) == 0) {
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyFrontEnd duplicate %s",
+ DatumGetName(d)->data);
+ heap_delete(lRel, &lTuple->t_ctid);
+ goto release_buffer;
+ }
+ }
+ if (hack_count < 32) {
+ hack[hack_count++] = pstrdup(DatumGetName(d)->data);
+ }
+ }
+
rTuple = heap_modifytuple(lTuple, b, lRel, value, nulls, repl);
heap_replace(lRel, &lTuple->t_ctid, rTuple);

/* notifying the front end */
+ TPRINTF(TRACE_NOTIFY,
+ "Async_NotifyFrontEnd notifying %s",
+ DatumGetName(d)->data);

if (whereToSendOutput == Remote)
{
***************
*** 625,635 ****
}
else
{
! elog(NOTICE, "Async_NotifyFrontEnd: no asynchronous notification to frontend on interactive sessions");
}
ReleaseBuffer(b);
}
! CommitTransactionCommand();
}

static int
--- 669,686 ----
}
else
{
! elog(NOTICE,
! "Async_NotifyFrontEnd: no asynchronous notification "
! "to frontend on interactive sessions");
}
+
+ release_buffer:
ReleaseBuffer(b);
}
! heap_endscan(sRel);
! heap_close(lRel);
! RelationUnsetLockForWrite(lRel);
! TPRINTF(TRACE_NOTIFY, "Async_NotifyFrontEnd done");
}

static int

Massimo Dal Zotto

+----------------------------------------------------------------------+
| Massimo Dal Zotto e-mail: dz(at)cs(dot)unitn(dot)it |
| Via Marconi, 141 phone: ++39-461-534251 |
| 38057 Pergine Valsugana (TN) www: http://www.cs.unitn.it/~dz/ |
| Italy pgp: finger dz(at)tango(dot)cs(dot)unitn(dot)it |
+----------------------------------------------------------------------+

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message ocie 1998-04-22 21:01:00 Re: [HACKERS] Async IO description
Previous Message Bruce Momjian 1998-04-22 17:18:42 Re: [HACKERS] Re: [QUESTIONS] How to use memory instead of hd?