*** src/backend/storage/ipc/imsg.c	dc149eef487eafb43409a78b8a33c70e7d3c2bfa
--- src/backend/storage/ipc/imsg.c	dc149eef487eafb43409a78b8a33c70e7d3c2bfa
***************
*** 1,0 ****
--- 1,275 ----
+ /*-------------------------------------------------------------------------
+  *
+  * imsg.c
+  *
+  *	  Internal message passing for process to process communication
+  *    via shared memory.
+  *
+  * Copyright (c) 2005-2010, Markus Wanner
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #include <unistd.h>
+ #include <signal.h>
+ #include <string.h>
+ 
+ #ifdef HAVE_SYS_FILIO_H
+ #include <sys/filio.h>
+ #endif
+ 
+ #include <sys/ioctl.h>
+ 
+ #include "postgres.h"
+ #include "miscadmin.h"
+ #include "storage/proc.h"
+ #include "storage/procsignal.h"
+ #include "storage/imsg.h"
+ #include "storage/ipc.h"
+ #include "storage/shmem.h"
+ #include "storage/spin.h"
+ #include "utils/elog.h"
+ 
+ /* global variable pointing to the shmem area */
+ IMessageCtlData *IMessageCtl = NULL;
+ 
+ /*
+  * flag set by the signal handler, initialized to true to ensure
+  * IMessageCheck is called at least once after process startup.
+  */
+ static bool got_IMessage = false;
+ 
+ 
+ /*
+  * Initialization of shared memory for internal messages.
+  */
+ int
+ IMessageShmemSize(void)
+ {
+ 	return MAXALIGN(sizeof(IMessageCtlData) +
+ 					sizeof(Deque) * (MaxBackends - 1));
+ }
+ 
+ void
+ IMessageShmemInit(void)
+ {
+ 	bool		foundIMessageCtl;
+ 	int         i;
+ 
+ #ifdef IMSG_DEBUG
+ 	elog(DEBUG3, "IMessageShmemInit(): initializing shared memory");
+ #endif
+ 
+ 	IMessageCtl = (IMessageCtlData *)
+ 		ShmemInitStruct("IMsgCtl", IMessageShmemSize(),
+ 						&foundIMessageCtl);
+ 
+ 	if (foundIMessageCtl)
+ 		return;
+ 
+ 	/* initialize the per-backend message sink */
+ 	for (i = 0; i < MaxBackends; i++)
+ 		deque_init(&IMessageCtl->lists[i]);
+ }
+ 
+ char *
+ decode_imessage_type(const IMessageType msg_type)
+ {
+ 	switch (msg_type)
+ 	{
+ 		case IMSGT_TEST:
+ 			return "IMSGT_TEST";
+ 		default:
+ 			return "unknown message type";
+ 	}
+ }
+ 
+ /*
+  *   IMessageCreateInternal
+  *
+  * Creates a new but deactivated message within the queue, returning the
+  * message header of the newly created message or NULL if there's not enough
+  * space in shared memory for a message of the requested size.
+  */
+ static IMessage*
+ IMessageCreateInternal(IMessageType type, int msg_size)
+ {
+ 	IMessage *msg;
+ 
+ 	msg = (IMessage*) ShmemDynAlloc(sizeof(IMessage) + msg_size);
+ 
+ 	if (msg)
+ 	{
+ #ifdef MAGIC
+ 		msg->magic = MAGIC_VALUE;
+ 		ShmemDynCheck(msg);
+ #endif
+ 
+ 		msg->sender = InvalidBackendId;
+ 		msg->type = type;
+ 		msg->size = msg_size;
+ 
+ #ifdef IMSG_DEBUG
+ 		elog(DEBUG3, "IMessageCreateInternal(): created message type %s of size %d at %p",
+ 			 decode_imessage_type(type), (int) msg_size, msg);
+ #endif
+ 	}
+ 
+ 	return msg;
+ }
+ 
+ /*
+  *   IMessageCreate
+  *
+  * Creates a new but deactivated message within the queue, returning the
+  * message header of the newly created message. Blocks until there is
+  * enough space available for the message in shared memory, retrying every
+  * 100ms.
+  *
+  * FIXME: this is not the best way to handle out of (imessage) memory, as
+  *        the process wanting to create an IMessage may well receive more
+  *        imessages while it is waiting to create a new one (previously
+  *        created, but not activated ones). However, for most callers this
+  *        would mean having to cache the message in local memory until its
+  *        deliverable.
+  */
+ IMessage*
+ IMessageCreate(IMessageType type, int msg_size)
+ {
+ 	IMessage *msg;
+ 
+ 	while (!(msg = IMessageCreateInternal(type, msg_size)))
+ 	{
+ 		elog(WARNING, "imessage: waiting for %d bytes to be freed",
+ 			 (int) sizeof(IMessage) + msg_size);
+ 
+ 		pg_usleep(100000);
+ 	}
+ 
+ 	return msg;
+ }
+ 
+ int
+ IMessageActivate(IMessage *msg, BackendId recipient)
+ {
+ 	Assert(msg);
+ 	Assert(recipient >= 0);
+ 	Assert(recipient < MaxBackends);
+ #ifdef MAGIC
+ 	Assert(msg->magic == MAGIC_VALUE);
+ 	ShmemDynCheck(msg);
+ #endif
+ 
+ #ifdef IMSG_DEBUG
+ 	elog(DEBUG3, "IMessageActivate(): activating message of type %s and size %d for recipient %d",
+ 		 decode_imessage_type(msg->type), msg->size, recipient);
+ #endif
+ 
+ 	START_CRIT_SECTION();
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		IMessageCtlData *imsgctl = IMessageCtl;
+ 		Deque *list = &imsgctl->lists[recipient];
+ 
+ 		SpinLockAcquire(&list->lock);
+ 
+ 		msg->sender = MyBackendId;
+ 
+ 		deque_enqueue(list, &msg->list);
+ 
+ 		SpinLockRelease(&list->lock);
+ 	}
+ 	END_CRIT_SECTION();
+ 
+ 	return SendProcSignalById(PROCSIG_IMSG_INTERRUPT, recipient);
+ }
+ 
+ /*
+  *   IMessageRemove
+  *
+  * Marks a message as removable by setting the recipient to null. The message
+  * will eventually be removed during creation of new messages, see
+  * IMessageCreate().
+  */
+ void
+ IMessageRemove(IMessage *msg)
+ {
+ 	Assert(msg);
+ #ifdef MAGIC
+ 	Assert(msg->magic == MAGIC_VALUE);
+ #endif
+ 
+ 	ShmemDynFree((Pointer) msg);
+ }
+ 
+ /*
+  *    HandleIMessageInterrupt
+  *
+  * Called on PROCSIG_IMSGT_INTERRUPT, possibly within the signal handler.
+  */
+ void
+ HandleIMessageInterrupt()
+ {
+ 	got_IMessage = true;
+ }
+ 
+ /*
+  *   IMessageCheck
+  *
+  * Checks if there is a message in the queue for this process. Returns null
+  * if there is no message for this process, the message header otherwise. The
+  * message remains in the queue and should be removed by IMessageRemove().
+  */
+ IMessage*
+ IMessageCheck(void)
+ {
+ 	IMessage	   *msg;
+ 
+ 	/* short circuit in case we didn't receive a signal */
+ 	if (!got_IMessage)
+ 		return NULL;
+ 
+ #ifdef IMSG_DEBUG
+ 	elog(DEBUG3, "IMessageCheck(): backend %d (pid %d) got imsg interrupt",
+ 		 MyBackendId, MyProc->pid);
+ #endif
+ 
+ 	msg = NULL;
+ 	START_CRIT_SECTION();
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		IMessageCtlData *imsgctl = IMessageCtl;
+ 		Deque *list = &imsgctl->lists[MyBackendId];
+ 
+ 		SpinLockAcquire(&list->lock);
+ 
+ 		msg = (IMessage*) deque_dequeue(list);
+ 
+ 		SpinLockRelease(&list->lock);
+ 	}
+ 	END_CRIT_SECTION();
+ 
+ 	/*
+ 	 * Reset the flag, if we scanned through the list but didn't find any
+ 	 * new message.
+ 	 */
+ 	if (msg == NULL)
+ 		got_IMessage = false;
+ 
+ #ifdef MAGIC
+ 	if (msg != NULL)
+ 	{
+ 		Assert(msg->magic == MAGIC_VALUE);
+ 		ShmemDynCheck(msg);
+ 	}
+ #endif
+ 
+ #ifdef IMSG_DEBUG
+ 	if (msg != NULL)
+ 		elog(DEBUG3, "IMessageCheck(): new message at %p of type %s and size %d for [%d/%d]",
+ 			 msg, decode_imessage_type(msg->type), msg->size,
+ 			 MyBackendId, MyProcPid);
+ #endif
+ 
+ 	return msg;
+ }
============================================================
*** src/include/storage/imsg.h	d7d3960f42396bef49eb598411c215271570330e
--- src/include/storage/imsg.h	d7d3960f42396bef49eb598411c215271570330e
***************
*** 1,0 ****
--- 1,76 ----
+ /*-------------------------------------------------------------------------
+  *
+  * imsg.h
+  *
+  *	  Internal message passing for process to process communication
+  *    via shared memory.
+  *
+  * Copyright (c) 2005-2010, Markus Wanner
+  *
+  *-------------------------------------------------------------------------
+  */
+ 
+ #ifndef IMSG_H
+ #define IMSG_H
+ 
+ #include <sys/types.h>
+ 
+ #include "c.h"
+ #include "storage/backendid.h"
+ #include "storage/shmem.h"
+ #include "storage/spin.h"
+ 
+ /* get a data pointer from the header */
+ #define IMSG_DATA(imsg) ((Pointer) ((Pointer) imsg + sizeof(IMessage)))
+ 
+ /*
+  * Message types
+  */
+ typedef enum
+ {
+ 	IMSGT_TEST = 'T'				/* test message type */
+ } IMessageType;
+ 
+ /*
+  * Message descriptor in front of the message
+  */
+ typedef struct
+ {
+ 	DequeElem       list;
+ 
+ #ifdef MAGIC
+ 	uint64          magic;
+ #endif
+ 
+ 	/* backend id of the sender, null means not yet activated message */
+ 	BackendId	    sender;
+ 
+ 	/* message type */
+ 	IMessageType    type;
+ 
+ 	/* message size following, but not including this header */
+ 	int		    	size;
+ } IMessage;
+ 
+ /*
+  * shared-memory pool for internal messages.
+  */
+ typedef struct
+ {
+ 	/* a singly linked list per backend */
+ 	Deque lists[1];
+ } IMessageCtlData;
+ 
+ /* routines to send and receive internal messages */
+ extern int IMessageShmemSize(void);
+ extern void IMessageShmemInit(void);
+ extern IMessage* IMessageCreate(IMessageType type, int msg_size);
+ extern int IMessageActivate(IMessage *msg, BackendId recipient);
+ extern void IMessageRemove(IMessage *msg);
+ extern void HandleIMessageInterrupt(void);
+ extern IMessage* IMessageCheck(void);
+ 
+ /* mainly for debugging purposes */
+ extern char* decode_imessage_type(const IMessageType msg_type);
+ 
+ #endif   /* IMSG_H */
============================================================
*** src/backend/storage/ipc/Makefile	0610a969a64989edaa2c19f9c25362bd888bdeb9
--- src/backend/storage/ipc/Makefile	c1e3d9da5c3fa8e27f25f03c0cdd5b8149fb5a4c
*************** endif
*** 15,21 ****
  endif
  endif
  
! OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \
! 	sinval.o sinvaladt.o standby.o wamalloc.o
  
  include $(top_srcdir)/src/backend/common.mk
--- 15,21 ----
  endif
  endif
  
! OBJS = imsg.o ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o \
! 	shmqueue.o sinval.o sinvaladt.o standby.o wamalloc.o
  
  include $(top_srcdir)/src/backend/common.mk
============================================================
*** src/backend/storage/ipc/ipci.c	2c2131711f38125a39ec0c9dfaef970ca873d142
--- src/backend/storage/ipc/ipci.c	1e5026e411d3728825b8e18dc01a9a0ec9ce974d
***************
*** 29,34 ****
--- 29,35 ----
  #include "replication/walreceiver.h"
  #include "replication/walsender.h"
  #include "storage/bufmgr.h"
+ #include "storage/imsg.h"
  #include "storage/ipc.h"
  #include "storage/pg_shmem.h"
  #include "storage/pmsignal.h"
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 125,130 ****
--- 126,132 ----
  		size = add_size(size, BTreeShmemSize());
  		size = add_size(size, SyncScanShmemSize());
  		size = add_size(size, AsyncShmemSize());
+ 		size = add_size(size, IMessageShmemSize());
  #ifdef EXEC_BACKEND
  		size = add_size(size, ShmemBackendArraySize());
  #endif
*************** CreateSharedMemoryAndSemaphores(bool mak
*** 234,239 ****
--- 236,242 ----
  	BTreeShmemInit();
  	SyncScanShmemInit();
  	AsyncShmemInit();
+ 	IMessageShmemInit();
  
  #ifdef EXEC_BACKEND
  
============================================================
*** src/backend/storage/ipc/procsignal.c	0d72e226ad1e335340a408bdf250c871ee963e7e
--- src/backend/storage/ipc/procsignal.c	a86dc97e08f3efa9f69dda1d6815aff0156bf0dc
***************
*** 20,25 ****
--- 20,26 ----
  #include "bootstrap/bootstrap.h"
  #include "commands/async.h"
  #include "miscadmin.h"
+ #include "storage/imsg.h"
  #include "storage/ipc.h"
  #include "storage/procsignal.h"
  #include "storage/shmem.h"
*************** SendProcSignal(pid_t pid, ProcSignalReas
*** 224,229 ****
--- 225,253 ----
  }
  
  /*
+  * SendProcSignalById
+  *		Send a signal to a Postgres process identified by its backend id.
+  *
+  * This variation of SendProcSignal doesn't check the process id, thus it's
+  * chance of signaling the wrong backend is even higher. Users need to cope
+  * with signals erroneously delivered to a newly started backend.
+  */
+ int
+ SendProcSignalById(ProcSignalReason reason, BackendId backendId)
+ {
+ 	volatile ProcSignalSlot *slot;
+ 
+ 	Assert(backendId != InvalidBackendId);
+ 
+ 	slot = &ProcSignalSlots[backendId - 1];
+ 
+ 	/* Atomically set the proper flag */
+ 	slot->pss_signalFlags[reason] = true;
+ 	/* Send signal */
+ 	return kill(slot->pss_pid, SIGUSR1);
+ }
+ 
+ /*
   * CheckProcSignal - check to see if a particular reason has been
   * signaled, and clear the signal flag.  Should be called after receiving
   * SIGUSR1.
*************** procsignal_sigusr1_handler(SIGNAL_ARGS)
*** 278,282 ****
--- 302,309 ----
  	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
  		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
  
+ 	if (CheckProcSignal(PROCSIG_IMSG_INTERRUPT))
+ 		HandleIMessageInterrupt();
+ 
  	errno = save_errno;
  }
============================================================
*** src/include/storage/procsignal.h	371d9c8b77490ffca09bd5227e658af452dae5b4
--- src/include/storage/procsignal.h	dcecd1a312db4fe0aff6cb6b5037e109677bfc8b
*************** typedef enum
*** 31,36 ****
--- 31,37 ----
  {
  	PROCSIG_CATCHUP_INTERRUPT,	/* sinval catchup interrupt */
  	PROCSIG_NOTIFY_INTERRUPT,	/* listen/notify interrupt */
+ 	PROCSIG_IMSG_INTERRUPT,		/* internal message received */
  
  	/* Recovery conflict reasons */
  	PROCSIG_RECOVERY_CONFLICT_DATABASE,
*************** extern int SendProcSignal(pid_t pid, Pro
*** 52,57 ****
--- 53,59 ----
  extern void ProcSignalInit(int pss_idx);
  extern int SendProcSignal(pid_t pid, ProcSignalReason reason,
  			   BackendId backendId);
+ extern int  SendProcSignalById(ProcSignalReason reason, BackendId backendId);
  
  extern void procsignal_sigusr1_handler(SIGNAL_ARGS);
  
