Index: src/backend/postmaster/autovacuum.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/postmaster/autovacuum.c,v
retrieving revision 1.34
diff -c -p -r1.34 autovacuum.c
*** src/backend/postmaster/autovacuum.c	13 Mar 2007 00:33:41 -0000	1.34
--- src/backend/postmaster/autovacuum.c	21 Mar 2007 22:17:01 -0000
***************
*** 52,57 ****
--- 52,58 ----
  #include "utils/syscache.h"
  
  
+ static volatile sig_atomic_t got_SIGUSR1 = false;
  static volatile sig_atomic_t got_SIGHUP = false;
  static volatile sig_atomic_t avlauncher_shutdown_request = false;
  
*************** static volatile sig_atomic_t avlauncher_
*** 59,64 ****
--- 60,66 ----
   * GUC parameters
   */
  bool		autovacuum_start_daemon = false;
+ int			autovacuum_max_workers;
  int			autovacuum_naptime;
  int			autovacuum_vac_thresh;
  double		autovacuum_vac_scale;
*************** int			autovacuum_freeze_max_age;
*** 69,75 ****
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flag to tell if we are in the autovacuum daemon process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
--- 71,77 ----
  int			autovacuum_vac_cost_delay;
  int			autovacuum_vac_cost_limit;
  
! /* Flag to tell if we are in an autovacuum process */
  static bool am_autovacuum_launcher = false;
  static bool am_autovacuum_worker = false;
  
*************** static MemoryContext AutovacMemCxt;
*** 85,116 ****
  /* struct to keep list of candidate databases for vacuum */
  typedef struct autovac_dbase
  {
! 	Oid			oid;
! 	char	   *name;
! 	TransactionId frozenxid;
! 	PgStat_StatDBEntry *entry;
  } autovac_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze */
  typedef struct autovac_table
  {
! 	Oid			relid;
! 	Oid			toastrelid;
! 	bool		dovacuum;
! 	bool		doanalyze;
! 	int			freeze_min_age;
! 	int			vacuum_cost_delay;
! 	int			vacuum_cost_limit;
  } autovac_table;
  
  typedef struct
  {
! 	Oid		process_db;			/* OID of database to process */
! 	int		worker_pid;			/* PID of the worker process, if any */
  } AutoVacuumShmemStruct;
  
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
--- 87,147 ----
  /* struct to keep list of candidate databases for vacuum */
  typedef struct autovac_dbase
  {
! 	Oid			ad_datid;			/* must be first (used as hashtable key) */
! 	TimestampTz	ad_next_worker;
! 	char	   *ad_name;
! 	TransactionId ad_frozenxid;
! 	PgStat_StatDBEntry *ad_entry;
  } autovac_dbase;
  
  /* struct to keep track of tables to vacuum and/or analyze */
  typedef struct autovac_table
  {
! 	Oid			at_relid;
! 	Oid			at_toastrelid;
! 	bool		at_dovacuum;
! 	bool		at_doanalyze;
! 	int			at_freeze_min_age;
! 	int			at_vacuum_cost_delay;
! 	int			at_vacuum_cost_limit;
  } autovac_table;
  
+ /*-------------
+  * This struct holds information about a single worker's whereabouts.  We keep
+  * an array of these in shared memory, sized according to
+  * autovacuum_max_workers.
+  *
+  * wi_dboid		OID of the database this worker is supposed to work on
+  * wi_tableoid	OID of the table currently being vacuumed
+  * wi_workerpid	PID of the running worker, 0 if not yet started
+  * wi_finished	True when the worker is done and about to exit
+  *-------------
+  */
+ typedef struct
+ {
+ 	Oid			wi_dboid;
+ 	Oid			wi_tableoid;
+ 	int			wi_workerpid;
+ 	bool		wi_finished;
+ } WorkerInfo;
+ 
  typedef struct
  {
! 	pid_t		av_launcherpid;
! 	WorkerInfo	av_workers[1];
! 	/* VARIABLE LENGTH STRUCT */
  } AutoVacuumShmemStruct;
  
+ /* Macro to iterate over all workers.  Beware multiple evaluation of args! */
+ #define foreach_worker(_i, _worker) \
+ 	_worker = (WorkerInfo *) (AutoVacuumShmem + \
+ 							  offsetof(AutoVacuumShmemStruct, av_workers)); \
+ 	for (_i = 0; _i < autovacuum_max_workers; _i++, _worker += sizeof(WorkerInfo))
+ 
  static AutoVacuumShmemStruct *AutoVacuumShmem;
  
+ static int free_workers;
+ 
  #ifdef EXEC_BACKEND
  static pid_t avlauncher_forkexec(void);
  static pid_t avworker_forkexec(void);
*************** static pid_t avworker_forkexec(void);
*** 118,139 ****
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static void do_autovacuum(PgStat_StatDBEntry *dbentry);
! static List *autovac_get_database_list(void);
  static void test_rel_for_autovac(Oid relid, PgStat_StatTabEntry *tabentry,
  					 Form_pg_class classForm,
  					 Form_pg_autovacuum avForm,
  					 List **vacuum_tables,
  					 List **toast_table_ids);
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
  
- 
  /********************************************************************
   *                    AUTOVACUUM LAUNCHER CODE
   ********************************************************************/
--- 149,183 ----
  NON_EXEC_STATIC void AutoVacWorkerMain(int argc, char *argv[]);
  NON_EXEC_STATIC void AutoVacLauncherMain(int argc, char *argv[]);
  
! static Oid do_start_worker(Dllist *database_list);
! static int launcher_determine_sleep(Dllist *database_list);
! static Dllist *launch_worker(Dllist *database_list, TimestampTz now);
! static List *get_database_list(void);
! static Dllist *get_database_dllist(Dllist *prevlist, Oid newdb);
! static int avdb_comparator(const void *a, const void *b);
! 
! static void do_autovacuum(WorkerInfo *worker, Oid dbid);
! static HeapTuple get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid);
  static void test_rel_for_autovac(Oid relid, PgStat_StatTabEntry *tabentry,
  					 Form_pg_class classForm,
  					 Form_pg_autovacuum avForm,
  					 List **vacuum_tables,
  					 List **toast_table_ids);
+ static bool relation_needs_autovacuum(Oid relid, PgStat_StatTabEntry *tabentry,
+ 						  Form_pg_class classForm,
+ 						  Form_pg_autovacuum avForm,
+ 						  autovac_table **tab);
+ static autovac_table *recheck_autovac_condition(Oid relid);
+ static void filter_table_list(List **vacuum_tables, List **toast_table_ids);
  static void autovacuum_do_vac_analyze(Oid relid, bool dovacuum,
  						  bool doanalyze, int freeze_min_age);
  static void autovac_report_activity(VacuumStmt *vacstmt, Oid relid);
  static void avl_sighup_handler(SIGNAL_ARGS);
+ static void avl_sigusr1_handler(SIGNAL_ARGS);
  static void avlauncher_shutdown(SIGNAL_ARGS);
  static void avl_quickdie(SIGNAL_ARGS);
  
  
  /********************************************************************
   *                    AUTOVACUUM LAUNCHER CODE
   ********************************************************************/
*************** StartAutoVacLauncher(void)
*** 212,226 ****
  
  /*
   * Main loop for the autovacuum launcher process.
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
  {
! 	sigjmp_buf	local_sigjmp_buf;
! 	List	   *dblist;
! 	bool		for_xid_wrap;
! 	autovac_dbase *db;
  	MemoryContext	avlauncher_cxt;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 256,302 ----
  
  /*
   * Main loop for the autovacuum launcher process.
+  *
+  * The signalling between launcher and worker is as follows:
+  *
+  * When the worker has finished starting up, it stores its PID in wi_workerpid
+  * and sends a SIGUSR1 signal to the launcher.  The launcher then knows that
+  * the postmaster is ready to start a new worker.  We do it this way because
+  * otherwise we risk calling SendPostmasterSignal() when the postmaster hasn't
+  * yet processed the last one, in which case the second signal would be lost.
+  * This is only useful when two workers need to be started close to one
+  * another, which should be rare but it's possible.
+  *
+  * Additionally, when the worker is finished with the vacuum work, it sets the
+  * wi_finished flag and sends a SIGUSR1 signal to the launcher.  Upon receipt
+  * of this signal, the launcher then clears the entry for future use and may
+  * start another worker right away, if need be.
+  *
+  * Note that there is still a race condition here, because a worker may finish
+  * just after the launcher sent the signal to postmaster, but before postmaster
+  * processes it.  At this point, the launcher receives a signal, sees the empty
+  * slot, so it sends the postmaster the signal again to start another worker.
+  * But the postmaster flag was already set, so the signal is lost.  To avoid
+  * this problem, the launcher will not try to start a new worker until all
+  * WorkerInfo entries that have the wi_dboid field set have a PID assigned.
+  * (FIXME -- this part is not yet implemented.)
+  *
+  * There is an additional problem if, for some reason, a worker starts and
+  * is not able to finish its task correctly.  It will not be able to set its
+  * finished flag, so the launcher will believe that it's still starting up.
+  * To prevent this problem, we check the PGPROCs of worker processes, and
+  * clean them up if we find they are not actually running (or they correspond
+  * to processes that are not autovacuum workers.)  We only do it if all 
+  * WorkerInfo structures are in use, thus frequently enough so that this
+  * problem doesn't cause any starvation, but seldom enough so that it's not a
+  * performance hit.  (FIXME -- this part is not yet implemented.)
   */
  NON_EXEC_STATIC void
  AutoVacLauncherMain(int argc, char *argv[])
  {
! 	sigjmp_buf		local_sigjmp_buf;
  	MemoryContext	avlauncher_cxt;
+ 	Dllist		   *database_list;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacLauncherMain(int argc, char *argv
*** 249,257 ****
  	 * Set up signal handlers.	Since this is an auxiliary process, it has
  	 * particular signal requirements -- no deadlock checker or sinval
  	 * catchup, for example.
- 	 *
- 	 * XXX It may be a good idea to receive signals when an avworker process
- 	 * finishes.
  	 */
  	pqsignal(SIGHUP, avl_sighup_handler);
  
--- 325,330 ----
*************** AutoVacLauncherMain(int argc, char *argv
*** 261,267 ****
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, SIG_IGN);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
--- 334,340 ----
  	pqsignal(SIGALRM, SIG_IGN);
  
  	pqsignal(SIGPIPE, SIG_IGN);
! 	pqsignal(SIGUSR1, avl_sigusr1_handler);
  	/* We don't listen for async notifies */
  	pqsignal(SIGUSR2, SIG_IGN);
  	pqsignal(SIGFPE, FloatExceptionHandler);
*************** AutoVacLauncherMain(int argc, char *argv
*** 343,365 ****
  	/* We can now handle ereport(ERROR) */
  	PG_exception_stack = &local_sigjmp_buf;
  
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
! 	PG_SETMASK(&UnBlockSig);
  
  	/*
! 	 * take a nap before executing the first iteration, unless we were
! 	 * requested an emergency run.
  	 */
! 	if (autovacuum_start_daemon)
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  
  	for (;;)
  	{
! 		TransactionId xidForceLimit;
! 		ListCell *cell;
! 		int		worker_pid;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
--- 416,448 ----
  	/* We can now handle ereport(ERROR) */
  	PG_exception_stack = &local_sigjmp_buf;
  
+ 	/* in emergency mode, just start a worker and go away */
+ 	if (!autovacuum_start_daemon)
+ 	{
+ 		do_start_worker(NULL);
+ 		proc_exit(0);		/* done */
+ 	}
+ 
  	ereport(LOG,
  			(errmsg("autovacuum launcher started")));
  
! 	AutoVacuumShmem->av_launcherpid = MyProcPid;
  
  	/*
! 	 * Create the initial database list.  The invariant we want this list to
! 	 * keep is that it's ordered by decreasing next_time; initially all times
! 	 * are zero, so order doesn't matter.  As soon as an entry is updated to
! 	 * a higher time, it will be moved to the front (which is correct because
! 	 * the only operation is to add autovacuum_naptime to the entry, and time
! 	 * always increases).
  	 */
! 	database_list = get_database_dllist(NULL, InvalidOid);
! 	free_workers = autovacuum_max_workers;
  
+ 	PG_SETMASK(&UnBlockSig);
  	for (;;)
  	{
! 		int		millis;
  
  		/*
  		 * Emergency bailout if postmaster has died.  This is to avoid the
*************** AutoVacLauncherMain(int argc, char *argv
*** 368,373 ****
--- 451,462 ----
  		if (!PostmasterIsAlive(true))
  			exit(1);
  
+ 		millis = launcher_determine_sleep(database_list);
+ 
+ 		/* Sleep for a while according to schedule */
+ 		pg_usleep(millis * 1000);
+ 
+ 		/* the normal shutdown case */
  		if (avlauncher_shutdown_request)
  			break;
  
*************** AutoVacLauncherMain(int argc, char *argv
*** 377,516 ****
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
  		/*
! 		 * if there's a worker already running, sleep until it
! 		 * disappears.
  		 */
! 		LWLockAcquire(AutovacuumLock, LW_SHARED);
! 		worker_pid = AutoVacuumShmem->worker_pid;
! 		LWLockRelease(AutovacuumLock);
  
! 		if (worker_pid != 0)
  		{
! 			PGPROC *proc = BackendPidGetProc(worker_pid);
  
! 			if (proc != NULL && proc->isAutovacuum)
! 				goto sleep;
  			else
  			{
  				/*
! 				 * if the worker is not really running (or it's a process
! 				 * that's not an autovacuum worker), remove the PID from shmem.
! 				 * This should not happen, because either the worker exits
! 				 * cleanly, in which case it'll remove the PID, or it dies, in
! 				 * which case postmaster will cause a system reset cycle.
  				 */
! 				LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 				worker_pid = 0;
! 				LWLockRelease(AutovacuumLock);
  			}
  		}
  
! 		/* Get a list of databases */
! 		dblist = autovac_get_database_list();
  
! 		/*
! 		 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
! 		 * to pass without forcing a vacuum.  (This limit can be tightened for
! 		 * particular tables, but not loosened.)
! 		 */
! 		recentXid = ReadNewTransactionId();
! 		xidForceLimit = recentXid - autovacuum_freeze_max_age;
! 		/* ensure it's a "normal" XID, else TransactionIdPrecedes misbehaves */
! 		if (xidForceLimit < FirstNormalTransactionId)
! 			xidForceLimit -= FirstNormalTransactionId;
  
! 		/*
! 		 * Choose a database to connect to.  We pick the database that was least
! 		 * recently auto-vacuumed, or one that needs vacuuming to prevent Xid
! 		 * wraparound-related data loss.  If any db at risk of wraparound is
! 		 * found, we pick the one with oldest datfrozenxid, independently of
! 		 * autovacuum times.
! 		 *
! 		 * Note that a database with no stats entry is not considered, except for
! 		 * Xid wraparound purposes.  The theory is that if no one has ever
! 		 * connected to it since the stats were last initialized, it doesn't need
! 		 * vacuuming.
! 		 *
! 		 * XXX This could be improved if we had more info about whether it needs
! 		 * vacuuming before connecting to it.  Perhaps look through the pgstats
! 		 * data for the database's tables?  One idea is to keep track of the
! 		 * number of new and dead tuples per database in pgstats.  However it
! 		 * isn't clear how to construct a metric that measures that and not cause
! 		 * starvation for less busy databases.
! 		 */
! 		db = NULL;
! 		for_xid_wrap = false;
! 		foreach(cell, dblist)
! 		{
! 			autovac_dbase *tmp = lfirst(cell);
  
- 			/* Find pgstat entry if any */
- 			tmp->entry = pgstat_fetch_stat_dbentry(tmp->oid);
  
! 			/* Check to see if this one is at risk of wraparound */
! 			if (TransactionIdPrecedes(tmp->frozenxid, xidForceLimit))
! 			{
! 				if (db == NULL ||
! 					TransactionIdPrecedes(tmp->frozenxid, db->frozenxid))
! 					db = tmp;
! 				for_xid_wrap = true;
! 				continue;
! 			}
! 			else if (for_xid_wrap)
! 				continue;			/* ignore not-at-risk DBs */
  
! 			/*
! 			 * Otherwise, skip a database with no pgstat entry; it means it
! 			 * hasn't seen any activity.
! 			 */
! 			if (!tmp->entry)
! 				continue;
  
  			/*
! 			 * Remember the db with oldest autovac time.  (If we are here,
! 			 * both tmp->entry and db->entry must be non-null.)
  			 */
! 			if (db == NULL ||
! 				tmp->entry->last_autovac_time < db->entry->last_autovac_time)
! 				db = tmp;
  		}
  
! 		/* Found a database -- process it */
! 		if (db != NULL)
  		{
! 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 			AutoVacuumShmem->process_db = db->oid;
! 			LWLockRelease(AutovacuumLock);
  
! 			SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
  		}
! 		
! sleep:
  		/*
! 		 * in emergency mode, exit immediately so that the postmaster can
! 		 * request another run right away if needed.
! 		 *
! 		 * XXX -- maybe it would be better to handle this inside the launcher
! 		 * itself.
  		 */
! 		if (!autovacuum_start_daemon)
! 			break;
  
! 		/* have pgstat read the file again next time */
! 		pgstat_clear_snapshot();
  
! 		/* now sleep until the next autovac iteration */
! 		pg_usleep(autovacuum_naptime * 1000000L); 
  	}
  
! 	/* Normal exit from the autovac launcher is here */
! 	ereport(LOG,
! 			(errmsg("autovacuum launcher shutting down")));
  
! 	proc_exit(0);		/* done */
  }
  
  /* SIGHUP: set flag to re-read config file at next convenient time */
  static void
  avl_sighup_handler(SIGNAL_ARGS)
--- 466,1044 ----
  			ProcessConfigFile(PGC_SIGHUP);
  		}
  
+ 		/* a worker started up or finished */
+ 		if (got_SIGUSR1)
+ 		{
+ 			WorkerInfo *worker;
+ 			int			i;
+ 
+ 			got_SIGUSR1 = false;
+ 
+ 			/* Walk the workers and clean up finished entries. */
+ 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);	
+ 			foreach_worker(i, worker)
+ 			{
+ 				if (worker->wi_finished)
+ 				{
+ 					worker->wi_tableoid = InvalidOid;
+ 					worker->wi_dboid = InvalidOid;
+ 					worker->wi_workerpid = 0;
+ 					worker->wi_finished = false;
+ 					free_workers++;
+ 				}
+ 			}
+ 			LWLockRelease(AutovacuumLock);
+ 		}
+ 
+ #if 0
  		/*
! 		 * Find and remove all entries corresponding to workers that failed to
! 		 * start.  Problem: how do we detect that it failed to start, yet leave
! 		 * alone those that are still really starting up?
! 		 *
! 		 * Idea: if we find that a database is listed twice, and none of the
! 		 * workers has registered, then something is wrong.  This fails if
! 		 * all workers failed in different databases however.
! 		 *
! 		 * Another idea: wreak havoc if a worker was started longer than
! 		 * autovac_naptime seconds ago and still hasn't registered.
  		 */
! 		if (free_workers == 0)
! 		{
! 			LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 			foreach_worker(i, worker)
! 			{
! 				if (worker->wi_workerpid == 0 && ...)
! 					clear the worker entry
! 			}
! 			LWLockRelease(AutovacuumLock);
! 		}
! #endif
  
! 		/*
! 		 * See if there's need to start a new worker, and do so if possible.
! 		 * If there are no free worker slots, avoid doing all this work, as
! 		 * we will not be able to start the worker anyway.
! 		 */
! 		if (free_workers > 0)
  		{
! 			TimestampTz	current_time;
! 			Dlelem	   *elem;
  
! 			elem = DLGetTail(database_list);
! 
! 			current_time = GetCurrentTimestamp();
! 
! 			if (elem != NULL)
! 			{
! 				autovac_dbase *avdb = DLE_VAL(elem);
! 				long	secs;
! 				int		usecs;
! 
! 				TimestampDifference(current_time, avdb->ad_next_worker, &secs, &usecs);
! 
! 				/* we have to start a worker now */
! 				if (secs <= 0 && usecs <= 0)
! 					database_list = launch_worker(database_list, current_time);
! 			}
  			else
  			{
  				/*
! 				 * Special case when the list is empty: start a worker right
! 				 * away.  This covers the initial case, when no database is in
! 				 * pgstats (thus the list is empty).
  				 */
! 				database_list = launch_worker(database_list, current_time);
  			}
  		}
  
! 		/* have pgstat read the file again next time */
! 		pgstat_clear_snapshot();
! 	}
  
! 	/* Normal exit from the autovac launcher is here */
! 	ereport(LOG,
! 			(errmsg("autovacuum launcher shutting down")));
! 	AutoVacuumShmem->av_launcherpid = 0;
  
! 	proc_exit(0);		/* done */
! }
  
  
! /*
!  * Determine the time to sleep based on the database list.
!  */
! static int
! launcher_determine_sleep(Dllist *database_list)
! {
! 	long	secs;
! 	int		usecs;
! 	Dlelem *elem;
  
! 	/*
! 	 * We sleep until the next scheduled vacuum.  We trust that when the
! 	 * database list was built, care was taken so that no entries have
! 	 * negative times; if the first entry has a too close next_worker value,
! 	 * we will sleep a small nominal time.
! 	 */
! 	elem = DLGetTail(database_list);
! 	if (elem != NULL)
! 	{
! 		autovac_dbase  *avdb = DLE_VAL(elem);
! 		TimestampTz		current_time = GetCurrentTimestamp();
! 		TimestampTz		next_wakeup;
  
+ 		next_wakeup = avdb->ad_next_worker;
+ 		TimestampDifference(current_time, next_wakeup, &secs, &usecs);
+ 	}
+ 	else
+ 	{
+ 		/* list is empty, sleep for whole autovacuum_naptime seconds  */
+ 		secs = autovacuum_naptime;
+ 		usecs = 0;
+ 	}
+ 	/*
+ 	 * someone screwed up (invalid entry on the list); sleep a nominal amount
+ 	 */
+ 	if (secs <= 0L && usecs <= 0)
+ 	{
+ 		secs = 0;
+ 		usecs = 500000;	/* 500 ms */
+ 	}
+ 
+ 	return secs * 1000 + usecs / 1000;
+ }
+ 
+ /*
+  * Build and return a database list as a doubly-linked list.  This list
+  * only contains databases that appear in pgstats, and will be sorted by
+  * next_worker from highest to lowest.
+  *
+  * Receives a previous database list, and the Oid of the database that made
+  * this list be generated (we call this the "new" database, because when the
+  * database was already present on the list, we expect that this function is
+  * not called at all).  If not NULL, the list will be used to save the previous
+  * "next_worker" values from each entry.  The new database will be used so that
+  * database is not vacuumed until autovacuum_naptime more seconds have elapsed.
+  *
+  * FIXME: when receiving a new database, we should re-schedule the first
+  * database in the list.  This is to avoid "ganging" the two databases together
+  * in the next iteration.  Another option would be to reschedule all the
+  * databases, so that the times are regularly spread in the whole autovac
+  * naptime interval.
+  */
+ static Dllist *
+ get_database_dllist(Dllist *prevlist, Oid newdb)
+ {
+ 	Dllist	   *newdllist;
+ 	List	   *dblist;
+ 	ListCell   *cell;
+ 	autovac_dbase *dbary;
+ 	int			i;
+ 	TimestampTz	current_time;
+ 	TimestampTz	initial_time;
+ 	int			millis_increment = 0;
+ 	int			total_dbs;
+ 	int			unset_dbs = 0;
+ 
+ 	/*
+ 	 * To build a sorted dllist, we first store the elements in a fixed-size
+ 	 * array, which we sort, and finally we store the individual elements in
+ 	 * the doubly linked list.
+ 	 */
+ 	dblist = get_database_list();
+ 	current_time = GetCurrentTimestamp();
+ 
+ 	/*
+ 	 * The new database array.  We must not free it: these will become the
+ 	 * elements of the Dllist.
+ 	 */
+ 	dbary = palloc(sizeof(autovac_dbase) * list_length(dblist));
+ 
+ 	i = 0;
+ 	foreach(cell, dblist)
+ 	{
+ 		autovac_dbase   *avdb = lfirst(cell);
+ 		PgStat_StatDBEntry *dbentry;
+ 		Dlelem		   *elm;
+ 		
+ 		Assert(avdb->ad_next_worker == 0);
+ 
+ 		dbentry = pgstat_fetch_stat_dbentry(avdb->ad_datid);
+ 		/* skip DBs without pgstat entry */
+ 		if (dbentry == NULL)
+ 			continue;
+ 
+ 		/* We set the new database to "now + autovacuum_naptime" */
+ 		if (avdb->ad_datid == newdb)
+ 		{
+ 			avdb->ad_next_worker =
+ 				TimestampTzPlusMilliseconds(current_time,
+ 											autovacuum_naptime * 1000);
+ 		}
+ 		else
+ 		{
  			/*
! 			 * Otherwise, if the database has an entry on the old list, copy
! 			 * the next_worker field into the new list.
  			 */
! 			elm = prevlist ? DLGetHead(prevlist) : NULL;
! 			while (elm != NULL)
! 			{
! 				autovac_dbase	*tmp = DLE_VAL(elm);
! 
! 				if (tmp->ad_datid == avdb->ad_datid)
! 				{
! 					avdb->ad_next_worker = tmp->ad_next_worker;
! 					break;
! 				}
! 				elm = DLGetSucc(elm);
! 			}
  		}
  
! 		/* other databases will have the time set later */
! 		if (avdb->ad_next_worker == 0)
! 			unset_dbs++;
! 
! 		/* copy the entry into the array */
! 		memcpy(&(dbary[i++]), avdb, sizeof(autovac_dbase));
! 	}
! 
! 	total_dbs = i;
! 	if (unset_dbs > 0)
! 		millis_increment = autovacuum_naptime * 1000 / unset_dbs;
! 	initial_time = TimestampTzPlusMilliseconds(current_time, millis_increment);
! 
! 	/* now set the time to the unknown databases */
! 	for (i = 0; i < total_dbs; i++)
! 	{
! 		autovac_dbase	*avdb = &dbary[i];
! 
! 		if (avdb->ad_next_worker == 0)
  		{
! 			avdb->ad_next_worker = initial_time;
! 			initial_time = TimestampTzPlusMilliseconds(initial_time,
! 													   millis_increment);
! 		}
! 	}
! 
! 	/* FIXME: free the remnants of the old list */
! 
! 	/* sort the array */
! 	qsort(dbary, i, sizeof(autovac_dbase), avdb_comparator);
! 
! 
! 	/* enter each array element into the new dl list */
! 	newdllist = DLNewList();
! 	for (i = 0; i < total_dbs; i++)
! 	{
! 		Dlelem		   *elem;
! 
! 		/* insert the the i-th array element as a dlelem into the new list */
! 		elem = DLNewElem(&dbary[i]);
! 
! 		DLAddHead(newdllist, elem);
! 	}
! 
! 	return newdllist;
! }
! 
! /* qsort comparator for autovac_dbase, using next_worker */
! static int
! avdb_comparator(const void *a, const void *b)
! {
! 	if (((autovac_dbase *) a)->ad_next_worker == ((autovac_dbase *) b)->ad_next_worker)
! 		return 0;
! 	else
! 		return (((autovac_dbase *) a)->ad_next_worker > ((autovac_dbase *) b)->ad_next_worker) ? 1 : -1;
! }
! 
! /*
!  * get_database_list
!  *
!  *		Return a List of all databases.
!  *
!  *		Note we cannot use pg_database, because we aren't connected; we use the
!  *		flat database file.
!  */
! static List *
! get_database_list(void)
! {
! 	char	   *filename;
! 	List	   *dblist = NIL;
! 	char		thisname[NAMEDATALEN];
! 	FILE	   *db_file;
! 	Oid			db_id;
! 	Oid			db_tablespace;
! 	TransactionId db_frozenxid;
! 
! 	filename = database_getflatfilename();
! 	db_file = AllocateFile(filename, "r");
! 	if (db_file == NULL)
! 		ereport(FATAL,
! 				(errcode_for_file_access(),
! 				 errmsg("could not open file \"%s\": %m", filename)));
! 
! 	while (read_pg_database_line(db_file, thisname, &db_id,
! 								 &db_tablespace, &db_frozenxid))
! 	{
! 		autovac_dbase *avdb;
! 
! 		avdb = (autovac_dbase *) palloc(sizeof(autovac_dbase));
  
! 		avdb->ad_datid = db_id;
! 		avdb->ad_name = pstrdup(thisname);
! 		avdb->ad_frozenxid = db_frozenxid;
! 		/* this gets set later: */
! 		avdb->ad_next_worker = 0;
! 		avdb->ad_entry = NULL;
! 
! 		dblist = lappend(dblist, avdb);
! 	}
! 
! 	FreeFile(db_file);
! 	pfree(filename);
! 
! 	return dblist;
! }
! 
! 
! /*
!  * do_start_worker
!  *
!  * Bare-bones procedure for starting an autovacuum worker from the launcher.
!  * It determines what database to work on, sets up shared memory stuff and
!  * signals postmaster to start the worker.  It fails gracefully if invoked when
!  * autovacuum_workers are already active.
!  *
!  * Return value is the OID of the database that the worker is going to process,
!  * or InvalidOid if no worker was actually started.
!  */
! static Oid
! do_start_worker(Dllist *database_list)
! {
! 	List	   *dblist;
! 	WorkerInfo *worker;
! 	int			i;
! 	ListCell   *cell;
! 	TransactionId xidForceLimit;
! 	bool		for_xid_wrap;
! 	autovac_dbase *avdb;
! 	TimestampTz		current_time;
! 
! 	/*
! 	 * Find an unused WorkerInfo entry to set up.  If there is none, go to
! 	 * sleep.
! 	 *
! 	 * NB: we only read the array here, and save a pointer where we'll
! 	 * write the entry later.  Since this is the only process that creates
! 	 * new entries into the array, there's no risk that somebody else will
! 	 * use that pointer while we weren't looking.
! 	 */
! 	LWLockAcquire(AutovacuumLock, LW_SHARED);
! 	foreach_worker(i, worker)
! 	{
! 		/* Invalid database OID means unused worker entry; use it */
! 		if (!OidIsValid(worker->wi_dboid))
! 			break;
! 	}
! 	LWLockRelease(AutovacuumLock);
! 
! 	/* they're all used up */
! 	if (i >= autovacuum_max_workers)
! 		return InvalidOid;
! 
! 	/* Get a list of databases */
! 	dblist = get_database_list();
! 
! 	/*
! 	 * Determine the oldest datfrozenxid/relfrozenxid that we will allow
! 	 * to pass without forcing a vacuum.  (This limit can be tightened for
! 	 * particular tables, but not loosened.)
! 	 */
! 	recentXid = ReadNewTransactionId();
! 	xidForceLimit = recentXid - autovacuum_freeze_max_age;
! 	/* ensure it's a "normal" XID, else TransactionIdPrecedes misbehaves */
! 	if (xidForceLimit < FirstNormalTransactionId)
! 		xidForceLimit -= FirstNormalTransactionId;
! 
! 	/*
! 	 * Choose a database to connect to.  We pick the database that was least
! 	 * recently auto-vacuumed, or one that needs vacuuming to prevent Xid
! 	 * wraparound-related data loss.  If any db at risk of wraparound is
! 	 * found, we pick the one with oldest datfrozenxid, independently of
! 	 * autovacuum times.
! 	 *
! 	 * Note that a database with no stats entry is not considered, except for
! 	 * Xid wraparound purposes.  The theory is that if no one has ever
! 	 * connected to it since the stats were last initialized, it doesn't need
! 	 * vacuuming.
! 	 *
! 	 * XXX This could be improved if we had more info about whether it needs
! 	 * vacuuming before connecting to it.  Perhaps look through the pgstats
! 	 * data for the database's tables?  One idea is to keep track of the
! 	 * number of new and dead tuples per database in pgstats.  However it
! 	 * isn't clear how to construct a metric that measures that and not cause
! 	 * starvation for less busy databases.
! 	 */
! 	avdb = NULL;
! 	for_xid_wrap = false;
! 	current_time = GetCurrentTimestamp();
! 	foreach(cell, dblist)
! 	{
! 		autovac_dbase *tmp = lfirst(cell);
! 		bool		skipit;
! 		Dlelem	   *elem;
! 
! 		/* Find pgstat entry if any */
! 		tmp->ad_entry = pgstat_fetch_stat_dbentry(tmp->ad_datid);
! 
! 		/* Check to see if this one is at risk of wraparound */
! 		if (TransactionIdPrecedes(tmp->ad_frozenxid, xidForceLimit))
! 		{
! 			if (avdb == NULL ||
! 				TransactionIdPrecedes(tmp->ad_frozenxid, avdb->ad_frozenxid))
! 				avdb = tmp;
! 			for_xid_wrap = true;
! 			continue;
  		}
! 		else if (for_xid_wrap)
! 			continue;			/* ignore not-at-risk DBs */
! 
  		/*
! 		 * Otherwise, skip a database with no pgstat entry; it means it
! 		 * hasn't seen any activity.
  		 */
! 		if (!tmp->ad_entry)
! 			continue;
  
! 		/*
! 		 * Also, skip a database that appears on the passed database list as
! 		 * having been processed recently (less than autovacuum_naptime seconds
! 		 * ago).  We do this so that we don't select a database which we just
! 		 * selected, but that pgstat hasn't gotten around to updating the
! 		 * last autovacuum time yet.
! 		 */
! 		skipit = false;
! 		elem = database_list ? DLGetTail(database_list) : NULL;
! 		while (elem != NULL)
! 		{
! 			autovac_dbase *dbp = DLE_VAL(elem);
  
! 			if (dbp->ad_datid == tmp->ad_datid)
! 			{
! 				TimestampTz		curr_plus_naptime;
! 				TimestampTz		next = dbp->ad_next_worker;
! 				
! 				curr_plus_naptime =
! 					TimestampTzPlusMilliseconds(current_time,
! 												autovacuum_naptime * 1000);
! 
! 				/*
! 				 * What we want here if to skip if next_worker falls between
! 				 * the current time and the current time plus naptime.
! 				 */
! 				if (timestamp_cmp_internal(current_time, next) > 0)
! 					skipit = false;
! 				else if (timestamp_cmp_internal(next, curr_plus_naptime) > 0)
! 					skipit = false;
! 				else
! 					skipit = true;
! 
! 				break;
! 			}
! 			elem = DLGetPred(elem);
! 		}
! 		if (skipit)
! 			continue;
! 
! 		/*
! 		 * Remember the db with oldest autovac time.  (If we are here,
! 		 * both tmp->entry and db->entry must be non-null.)
! 		 */
! 		if (avdb == NULL ||
! 			tmp->ad_entry->last_autovac_time < avdb->ad_entry->last_autovac_time)
! 			avdb = tmp;
  	}
  
! 	/* Found a database -- process it */
! 	if (avdb != NULL)
! 	{
! 		LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 		Assert(!OidIsValid(worker->wi_dboid));
! 		worker->wi_dboid = avdb->ad_datid;
! 		worker->wi_workerpid = 0;
! 		LWLockRelease(AutovacuumLock);
  
! 		SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_WORKER);
! 
! 		return avdb->ad_datid;
! 	}
! 
! 	return InvalidOid;
  }
  
+ /*
+  * launch_worker
+  *
+  * Wrapper for starting a worker from the launcher.  Besides actually starting
+  * it, update the database list to reflect the next time that another one will
+  * need to be started on the selected database.  The actual database choice is
+  * left to do_start_worker.
+  *
+  * This routine is also expected to insert an entry into the database list if
+  * the selected database was previously absent from the list.  It returns the
+  * new database list.
+  */
+ static Dllist *
+ launch_worker(Dllist *database_list, TimestampTz now)
+ {
+ 	Oid		dbid;
+ 	Dlelem *elem;
+ 
+ 	dbid = do_start_worker(database_list);
+ 	if (OidIsValid(dbid))
+ 		free_workers--;
+ 
+ 	/*
+ 	 * Walk the database list and update corresponding entry.  If it's not on
+ 	 * the list, we'll recreate the list.
+ 	 */
+ 	elem = DLGetHead(database_list);
+ 	while (elem != NULL)
+ 	{
+ 		autovac_dbase *avdb = DLE_VAL(elem);
+ 
+ 		if (avdb->ad_datid == dbid)
+ 		{
+ 			/*
+ 			 * add autovacuum_naptime seconds to the current time, and use that
+ 			 * as the new "next_worker" field for this database.
+ 			 */
+ 			avdb->ad_next_worker =
+ 				TimestampTzPlusMilliseconds(now, autovacuum_naptime * 1000);
+ 
+ 			DLMoveToFront(elem);
+ 			break;
+ 		}
+ 		elem = DLGetSucc(elem);
+ 	}
+ 
+ 	/*
+ 	 * If the database was not present in the database list, we rebuild the
+ 	 * list.  It's possible that the database does not get into the list
+ 	 * anyway, for example if it's a database that doesn't have a pgstat entry,
+ 	 * but this is not a problem because we don't want to schedule workers
+ 	 * regularly into those in any case.
+ 	 *
+ 	 */
+ 	if (elem == NULL)
+ 		database_list = get_database_dllist(database_list, dbid);
+ 
+ 	return database_list;
+ }
+ 
+ 
  /* SIGHUP: set flag to re-read config file at next convenient time */
  static void
  avl_sighup_handler(SIGNAL_ARGS)
*************** avl_sighup_handler(SIGNAL_ARGS)
*** 518,523 ****
--- 1046,1058 ----
  	got_SIGHUP = true;
  }
  
+ /* SIGUSR1: a worker is up and running, or just finished */
+ static void
+ avl_sigusr1_handler(SIGNAL_ARGS)
+ {
+ 	got_SIGUSR1 = true;
+ }
+ 
  static void
  avlauncher_shutdown(SIGNAL_ARGS)
  {
*************** NON_EXEC_STATIC void
*** 633,639 ****
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
--- 1168,1176 ----
  AutoVacWorkerMain(int argc, char *argv[])
  {
  	sigjmp_buf	local_sigjmp_buf;
! 	Oid			dbid = InvalidOid;
! 	WorkerInfo *worker;
! 	int			i;
  
  	/* we are a postmaster subprocess now */
  	IsUnderPostmaster = true;
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 731,752 ****
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Get the database Id we're going to work on, and announce our PID
! 	 * in the shared memory area.  We remove the database OID immediately
! 	 * from the shared memory area.
  	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 
! 	dbid = AutoVacuumShmem->process_db;
! 	AutoVacuumShmem->process_db = InvalidOid;
! 	AutoVacuumShmem->worker_pid = MyProcPid;
! 
  	LWLockRelease(AutovacuumLock);
  
  	if (OidIsValid(dbid))
  	{
  		char	*dbname;
- 		PgStat_StatDBEntry *dbentry;
  
  		/*
  		 * Report autovac startup to the stats collector.  We deliberately do
--- 1268,1295 ----
  	SetConfigOption("zero_damaged_pages", "false", PGC_SUSET, PGC_S_OVERRIDE);
  
  	/*
! 	 * Walk the WorkerInfo array, and get the database OID we're going to work
! 	 * on.  Use the first entry with PID 0 in the list, and advertise our PID
! 	 * on it, thus marking it used.
  	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	foreach_worker(i, worker)
! 	{
! 		if (worker->wi_workerpid == 0)
! 		{
! 			dbid = worker->wi_dboid;
! 			worker->wi_workerpid = MyProcPid;
! 			break;
! 		}
! 	}
! 	if (AutoVacuumShmem->av_launcherpid != 0)
! 		kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
  	LWLockRelease(AutovacuumLock);
  
+ 
  	if (OidIsValid(dbid))
  	{
  		char	*dbname;
  
  		/*
  		 * Report autovac startup to the stats collector.  We deliberately do
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 765,770 ****
--- 1308,1314 ----
  		 * stale stats info), we'll fail and exit here.
  		 */
  		InitPostgres(NULL, dbid, NULL, &dbname);
+ 
  		SetProcessingMode(NormalProcessing);
  		set_ps_display(dbname, false);
  		ereport(DEBUG1,
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 779,794 ****
  
  		/* And do an appropriate amount of work */
  		recentXid = ReadNewTransactionId();
! 		dbentry = pgstat_fetch_stat_dbentry(dbid);
! 		do_autovacuum(dbentry);
  	}
  
- 	/*
- 	 * Now remove our PID from shared memory, so that the launcher can start
- 	 * another worker as soon as appropriate.
- 	 */
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	AutoVacuumShmem->worker_pid = 0;
  	LWLockRelease(AutovacuumLock);
  
  	/* All done, go away */
--- 1323,1350 ----
  
  		/* And do an appropriate amount of work */
  		recentXid = ReadNewTransactionId();
! 		do_autovacuum(worker, dbid);
  	}
  
  	LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE);
! 	if (!autovacuum_start_daemon)
! 	{
! 		/* in emergency mode we must cleanup after ourselves */
! 		worker->wi_workerpid = 0;
! 		worker->wi_dboid = InvalidOid;
! 		worker->wi_finished = false;
! 	}
! 	else
! 	{
! 		/*
! 		 * Otherwise, let the launcher know we're done.  Warning: must set the
! 		 * flag before sending the signal.  XXX do we need to prevent compiler
! 		 * overenthusiastic optimization here?
! 		 */
! 		worker->wi_finished = true;
! 		if (AutoVacuumShmem->av_launcherpid != 0)
! 			kill(AutoVacuumShmem->av_launcherpid, SIGUSR1);
! 	}
  	LWLockRelease(AutovacuumLock);
  
  	/* All done, go away */
*************** AutoVacWorkerMain(int argc, char *argv[]
*** 796,858 ****
  }
  
  /*
-  * autovac_get_database_list
-  *
-  *		Return a list of all databases.  Note we cannot use pg_database,
-  *		because we aren't connected; we use the flat database file.
-  */
- static List *
- autovac_get_database_list(void)
- {
- 	char	   *filename;
- 	List	   *dblist = NIL;
- 	char		thisname[NAMEDATALEN];
- 	FILE	   *db_file;
- 	Oid			db_id;
- 	Oid			db_tablespace;
- 	TransactionId db_frozenxid;
- 
- 	filename = database_getflatfilename();
- 	db_file = AllocateFile(filename, "r");
- 	if (db_file == NULL)
- 		ereport(FATAL,
- 				(errcode_for_file_access(),
- 				 errmsg("could not open file \"%s\": %m", filename)));
- 
- 	while (read_pg_database_line(db_file, thisname, &db_id,
- 								 &db_tablespace, &db_frozenxid))
- 	{
- 		autovac_dbase *db;
- 
- 		db = (autovac_dbase *) palloc(sizeof(autovac_dbase));
- 
- 		db->oid = db_id;
- 		db->name = pstrdup(thisname);
- 		db->frozenxid = db_frozenxid;
- 		/* this gets set later: */
- 		db->entry = NULL;
- 
- 		dblist = lappend(dblist, db);
- 	}
- 
- 	FreeFile(db_file);
- 	pfree(filename);
- 
- 	return dblist;
- }
- 
- /*
   * Process a database table-by-table
   *
-  * dbentry is either a pointer to the database entry in the stats databases
-  * hash table, or NULL if we couldn't find any entry (the latter case occurs
-  * only if we are forcing a vacuum for anti-wrap purposes).
-  *
   * Note that CHECK_FOR_INTERRUPTS is supposed to be used in certain spots in
   * order not to ignore shutdown commands for too long.
   */
  static void
! do_autovacuum(PgStat_StatDBEntry *dbentry)
  {
  	Relation	classRel,
  				avRel;
--- 1352,1364 ----
  }
  
  /*
   * Process a database table-by-table
   *
   * Note that CHECK_FOR_INTERRUPTS is supposed to be used in certain spots in
   * order not to ignore shutdown commands for too long.
   */
  static void
! do_autovacuum(WorkerInfo *worker, Oid dbid)
  {
  	Relation	classRel,
  				avRel;
*************** do_autovacuum(PgStat_StatDBEntry *dbentr
*** 863,868 ****
--- 1369,1381 ----
  	List	   *toast_table_ids = NIL;
  	ListCell   *cell;
  	PgStat_StatDBEntry *shared;
+ 	PgStat_StatDBEntry *dbentry;
+ 
+ 	/*
+ 	 * may be NULL if we couldn't find an entry (only happens if we
+ 	 * are forcing a vacuum for anti-wrap purposes).
+ 	 */
+ 	dbentry = pgstat_fetch_stat_dbentry(dbid);
  
  	/* Start a transaction so our commands have one to play into. */
  	StartTransactionCommand();
*************** do_autovacuum(PgStat_StatDBEntry *dbentr
*** 933,941 ****
  		Form_pg_class classForm = (Form_pg_class) GETSTRUCT(tuple);
  		Form_pg_autovacuum avForm = NULL;
  		PgStat_StatTabEntry *tabentry;
- 		SysScanDesc avScan;
  		HeapTuple	avTup;
- 		ScanKeyData entry[1];
  		Oid			relid;
  
  		/* Consider only regular and toast tables. */
--- 1446,1452 ----
*************** do_autovacuum(PgStat_StatDBEntry *dbentr
*** 952,967 ****
  
  		relid = HeapTupleGetOid(tuple);
  
! 		/* See if we have a pg_autovacuum entry for this relation. */
! 		ScanKeyInit(&entry[0],
! 					Anum_pg_autovacuum_vacrelid,
! 					BTEqualStrategyNumber, F_OIDEQ,
! 					ObjectIdGetDatum(relid));
! 
! 		avScan = systable_beginscan(avRel, AutovacuumRelidIndexId, true,
! 									SnapshotNow, 1, entry);
! 
! 		avTup = systable_getnext(avScan);
  
  		if (HeapTupleIsValid(avTup))
  			avForm = (Form_pg_autovacuum) GETSTRUCT(avTup);
--- 1463,1469 ----
  
  		relid = HeapTupleGetOid(tuple);
  
! 		avTup = get_pg_autovacuum_tuple_relid(avRel, relid);
  
  		if (HeapTupleIsValid(avTup))
  			avForm = (Form_pg_autovacuum) GETSTRUCT(avTup);
*************** do_autovacuum(PgStat_StatDBEntry *dbentr
*** 978,1023 ****
  		test_rel_for_autovac(relid, tabentry, classForm, avForm,
  							 &vacuum_tables, &toast_table_ids);
  
! 		systable_endscan(avScan);
  	}
  
  	heap_endscan(relScan);
  	heap_close(avRel, AccessShareLock);
  	heap_close(classRel, AccessShareLock);
  
! 	/*
! 	 * Perform operations on collected tables.
! 	 */
  	foreach(cell, vacuum_tables)
  	{
  		autovac_table *tab = lfirst(cell);
  
  		CHECK_FOR_INTERRUPTS();
  
  		/*
! 		 * Check to see if we need to force vacuuming of this table because
! 		 * its toast table needs it.
  		 */
! 		if (OidIsValid(tab->toastrelid) && !tab->dovacuum &&
! 			list_member_oid(toast_table_ids, tab->toastrelid))
  		{
! 			tab->dovacuum = true;
! 			elog(DEBUG2, "autovac: VACUUM %u because of TOAST table",
! 				 tab->relid);
  		}
  
! 		/* Otherwise, ignore table if it needs no work */
! 		if (!tab->dovacuum && !tab->doanalyze)
  			continue;
  
  		/* Set the vacuum cost parameters for this table */
! 		VacuumCostDelay = tab->vacuum_cost_delay;
! 		VacuumCostLimit = tab->vacuum_cost_limit;
  
! 		autovacuum_do_vac_analyze(tab->relid,
! 								  tab->dovacuum,
! 								  tab->doanalyze,
! 								  tab->freeze_min_age);
  	}
  
  	/*
--- 1480,1558 ----
  		test_rel_for_autovac(relid, tabentry, classForm, avForm,
  							 &vacuum_tables, &toast_table_ids);
  
! 		if (HeapTupleIsValid(avTup))
! 			heap_freetuple(avTup);
  	}
  
  	heap_endscan(relScan);
  	heap_close(avRel, AccessShareLock);
  	heap_close(classRel, AccessShareLock);
  
! 	filter_table_list(&vacuum_tables, &toast_table_ids);
! 
! 	/* Perform operations on collected tables. */
  	foreach(cell, vacuum_tables)
  	{
  		autovac_table *tab = lfirst(cell);
+ 		WorkerInfo *other_worker;
+ 		int			i;
+ 		bool		skipit;
  
  		CHECK_FOR_INTERRUPTS();
  
+ 		Assert(tab->at_dovacuum || tab->at_doanalyze);
+ 		LWLockAcquire(AutovacuumScheduleLock, LW_EXCLUSIVE);
+ 
  		/*
! 		 * Check whether the table is being vacuumed concurrently by another
! 		 * worker.
  		 */
! 		skipit = false;
! 		foreach_worker(i, other_worker)
  		{
! 			if (other_worker->wi_tableoid == tab->at_relid)
! 			{
! 				LWLockRelease(AutovacuumScheduleLock);
! 				skipit = true;
! 				break;
! 			}
  		}
+ 		if (skipit)
+ 			continue;
  
! 		/*
! 		 * Check whether pgstat data still says we need to vacuum this table.
! 		 * It could have changed if other worker processed the table while we
! 		 * weren't looking; and pg_autovacuum parameters could have changed
! 		 * as well.
! 		 *
! 		 * We need fresh pgstat data for this.
! 		 *
! 		 * FIXME we ignore the possibility that the table was finished being
! 		 * vacuumed in the last 500ms (PGSTAT_STAT_INTERVAL).  This is a bug.
! 		 */
! 		pgstat_clear_snapshot();
! 		if (!recheck_autovac_condition(tab->at_relid))
! 		{
! 			LWLockRelease(AutovacuumScheduleLock);
  			continue;
+ 		}
+ 
+ 		/*
+ 		 * Ok, good to go.  Store the table in shared memory before releasing
+ 		 * lock so that no one else vacuums it concurrently.
+ 		 */
+ 		worker->wi_tableoid = tab->at_relid;
+ 		LWLockRelease(AutovacuumScheduleLock);
  
  		/* Set the vacuum cost parameters for this table */
! 		VacuumCostDelay = tab->at_vacuum_cost_delay;
! 		VacuumCostLimit = tab->at_vacuum_cost_limit;
  
! 		autovacuum_do_vac_analyze(tab->at_relid,
! 								  tab->at_dovacuum,
! 								  tab->at_doanalyze,
! 								  tab->at_freeze_min_age);
  	}
  
  	/*
*************** do_autovacuum(PgStat_StatDBEntry *dbentr
*** 1031,1040 ****
  }
  
  /*
!  * test_rel_for_autovac
   *
!  * Check whether a table needs to be vacuumed or analyzed.	Add it to the
!  * appropriate output list if so.
   *
   * A table needs to be vacuumed if the number of dead tuples exceeds a
   * threshold.  This threshold is calculated as
--- 1566,1671 ----
  }
  
  /*
!  * Returns a copy of the pg_autovacuum tuple for the given relid, or NULL if
!  * there isn't any.  avRel is pg_autovacuum, already open and suitably locked.
!  */
! static HeapTuple
! get_pg_autovacuum_tuple_relid(Relation avRel, Oid relid)
! {
! 	ScanKeyData entry[1];
! 	SysScanDesc avScan;
! 	HeapTuple	avTup;
! 
! 	ScanKeyInit(&entry[0],
! 				Anum_pg_autovacuum_vacrelid,
! 				BTEqualStrategyNumber, F_OIDEQ,
! 				ObjectIdGetDatum(relid));
! 
! 	avScan = systable_beginscan(avRel, AutovacuumRelidIndexId, true,
! 								SnapshotNow, 1, entry);
! 
! 	avTup = systable_getnext(avScan);
! 
! 	if (HeapTupleIsValid(avTup))
! 		avTup = heap_copytuple(avTup);
! 
! 	systable_endscan(avScan);
! 
! 	return avTup;
! }
! 
! static autovac_table *
! recheck_autovac_condition(Oid relid)
! {
! 	Relation	rel;
! 	Relation	avRel;
! 	PgStat_StatTabEntry *tabentry;
! 	autovac_table	*tab;
! 	Form_pg_autovacuum avForm = NULL;
! 	HeapTuple	avTup;
! 
! 	rel = relation_open(relid, AccessShareLock);
! 	tabentry = pgstat_fetch_stat_tabentry(relid);
! 
! 	/* get the pg_autovacuum tuple */
! 	avRel = heap_open(AutovacuumRelationId, AccessShareLock);
! 	avTup = get_pg_autovacuum_tuple_relid(avRel, relid);
! 	if (HeapTupleIsValid(avTup))
! 		avForm = (Form_pg_autovacuum) GETSTRUCT(avTup);
! 
! 	tab = palloc(sizeof(autovac_table));
! 
! 	if (relation_needs_autovacuum(relid, tabentry, rel->rd_rel, NULL, &tab))
! 	{
! 		heap_freetuple(avTup);
! 		heap_close(avRel, AccessShareLock);
! 		return tab;
! 	}
! 
! 	/* FIXME: grab the TOAST table info and do the test for it */
! 
! 	pfree(tab);
! 	heap_freetuple(avTup);
! 	heap_close(avRel, AccessShareLock);
! 	return NULL;
! }
! 
! static void
! test_rel_for_autovac(Oid relid, PgStat_StatTabEntry *tabentry,
! 					 Form_pg_class classForm,
! 					 Form_pg_autovacuum avForm,
! 					 List **vacuum_tables,
! 					 List **toast_table_ids)
! {
! 	autovac_table	*tab;
! 
! 	tab = palloc(sizeof(autovac_table));
! 
! 	if (relation_needs_autovacuum(relid, tabentry, classForm, avForm, &tab))
! 	{
! 		if (classForm->relkind == RELKIND_RELATION)
! 		{
! 			*vacuum_tables = lappend(*vacuum_tables, tab);
! 		}
! 		else
! 		{
! 			Assert(classForm->relkind == RELKIND_TOASTVALUE);
! 			*toast_table_ids = lappend_oid(*toast_table_ids, relid);
! 			pfree(tab);
! 		}
! 	}
! 	else
! 		pfree(tab);
! }
! 
! 
! /*
!  * relation_needs_autovacuum
   *
!  * Check whether a table needs to be vacuumed or analyzed.  Return value is
!  * true iff the table needs vacuum.  Additionally, the appropriate
!  * autovac_table struct will be returned in "av_tab".  (But if it is a TOAST
!  * table, the struct will not be used.)
   *
   * A table needs to be vacuumed if the number of dead tuples exceeds a
   * threshold.  This threshold is calculated as
*************** do_autovacuum(PgStat_StatDBEntry *dbentr
*** 1060,1075 ****
   * value <0 is substituted with the value of
   * autovacuum_vacuum_scale_factor GUC variable.  Ditto for analyze.
   */
! static void
! test_rel_for_autovac(Oid relid, PgStat_StatTabEntry *tabentry,
! 					 Form_pg_class classForm,
! 					 Form_pg_autovacuum avForm,
! 					 List **vacuum_tables,
! 					 List **toast_table_ids)
  {
  	bool		force_vacuum;
  	bool		dovacuum;
  	bool		doanalyze;
  	float4		reltuples;		/* pg_class.reltuples */
  	/* constants from pg_autovacuum or GUC variables */
  	int			vac_base_thresh,
--- 1691,1706 ----
   * value <0 is substituted with the value of
   * autovacuum_vacuum_scale_factor GUC variable.  Ditto for analyze.
   */
! static bool
! relation_needs_autovacuum(Oid relid, PgStat_StatTabEntry *tabentry,
! 						  Form_pg_class classForm,
! 						  Form_pg_autovacuum avForm,
! 						  autovac_table **av_tab)
  {
  	bool		force_vacuum;
  	bool		dovacuum;
  	bool		doanalyze;
+ 	bool		result;
  	float4		reltuples;		/* pg_class.reltuples */
  	/* constants from pg_autovacuum or GUC variables */
  	int			vac_base_thresh,
*************** test_rel_for_autovac(Oid relid, PgStat_S
*** 1151,1157 ****
  
  	/* User disabled it in pg_autovacuum?  (But ignore if at risk) */
  	if (avForm && !avForm->enabled && !force_vacuum)
! 		return;
  
  	if (PointerIsValid(tabentry))
  	{
--- 1782,1788 ----
  
  	/* User disabled it in pg_autovacuum?  (But ignore if at risk) */
  	if (avForm && !avForm->enabled && !force_vacuum)
! 		return false;
  
  	if (PointerIsValid(tabentry))
  	{
*************** test_rel_for_autovac(Oid relid, PgStat_S
*** 1207,1269 ****
  		 */
  		if (dovacuum || doanalyze || OidIsValid(classForm->reltoastrelid))
  		{
! 			autovac_table *tab;
  
! 			tab = (autovac_table *) palloc(sizeof(autovac_table));
! 			tab->relid = relid;
! 			tab->toastrelid = classForm->reltoastrelid;
! 			tab->dovacuum = dovacuum;
! 			tab->doanalyze = doanalyze;
! 			tab->freeze_min_age = freeze_min_age;
! 			tab->vacuum_cost_limit = vac_cost_limit;
! 			tab->vacuum_cost_delay = vac_cost_delay;
! 
! 			*vacuum_tables = lappend(*vacuum_tables, tab);
  		}
  	}
  	else
  	{
  		Assert(classForm->relkind == RELKIND_TOASTVALUE);
! 		if (dovacuum)
! 			*toast_table_ids = lappend_oid(*toast_table_ids, relid);
  	}
  }
  
  /*
!  * autovacuum_do_vac_analyze
!  *		Vacuum and/or analyze the specified table
   */
  static void
! autovacuum_do_vac_analyze(Oid relid, bool dovacuum, bool doanalyze,
! 						  int freeze_min_age)
  {
! 	VacuumStmt *vacstmt;
! 	MemoryContext old_cxt;
  
  	/*
! 	 * The node must survive transaction boundaries, so make sure we create it
! 	 * in a long-lived context
  	 */
! 	old_cxt = MemoryContextSwitchTo(AutovacMemCxt);
  
! 	vacstmt = makeNode(VacuumStmt);
  
  	/* Set up command parameters */
! 	vacstmt->vacuum = dovacuum;
! 	vacstmt->full = false;
! 	vacstmt->analyze = doanalyze;
! 	vacstmt->freeze_min_age = freeze_min_age;
! 	vacstmt->verbose = false;
! 	vacstmt->relation = NULL;	/* not used since we pass a relids list */
! 	vacstmt->va_cols = NIL;
  
  	/* Let pgstat know what we're doing */
! 	autovac_report_activity(vacstmt, relid);
! 
! 	vacuum(vacstmt, list_make1_oid(relid), true);
  
! 	pfree(vacstmt);
! 	MemoryContextSwitchTo(old_cxt);
  }
  
  /*
--- 1838,1945 ----
  		 */
  		if (dovacuum || doanalyze || OidIsValid(classForm->reltoastrelid))
  		{
! 			autovac_table	*tab = *av_tab;
  
! 			tab->at_relid = relid;
! 			tab->at_toastrelid = classForm->reltoastrelid;
! 			tab->at_dovacuum = dovacuum;
! 			tab->at_doanalyze = doanalyze;
! 			tab->at_freeze_min_age = freeze_min_age;
! 			tab->at_vacuum_cost_limit = vac_cost_limit;
! 			tab->at_vacuum_cost_delay = vac_cost_delay;
! 			result = true;
  		}
+ 		else
+ 			result = false;
  	}
  	else
  	{
  		Assert(classForm->relkind == RELKIND_TOASTVALUE);
! 		result = dovacuum;
  	}
+ 
+ 	return result;
  }
  
  /*
!  * After building the complete table lists, we filter out the tables that don't
!  * need any vacuuming.  We must do this after scanning pg_class, because
!  * otherwise we risk missing tables that need vacuum due to their
!  * toast tables needing vacuum.
   */
  static void
! filter_table_list(List **vacuum_tables, List **toast_table_ids)
  {
! 	ListCell   *cell;
! 	ListCell   *prev;
  
  	/*
! 	 * We can now filter out tables that don't need vacuuming, and set the
! 	 * dovacuum flag for those whose toast table does.
  	 */
! 	prev = NULL;
! 	cell = list_head(*vacuum_tables);
  
! 	while (cell != NULL)
! 	{
! 		autovac_table  *tab = lfirst(cell);
! 		bool		doit;
! 
! 		if (tab->at_dovacuum)
! 			doit = true;
! 		else
! 		{
! 			if (OidIsValid(tab->at_toastrelid))
! 				doit = list_member_oid(*toast_table_ids, tab->at_toastrelid);
! 			else
! 				doit = false;
! 		}
! 
! 		if (doit)
! 		{
! 			/*
! 			 * keep table on list, set dovacuum to true (redundant in some
! 			 */
! 			tab->at_dovacuum = true;
! 			prev = cell;
! 			cell = lnext(prev);
! 		}
! 		else
! 		{
! 			/* remove table from list, no operation needed */
! 			*vacuum_tables = list_delete_cell(*vacuum_tables, cell, prev);
! 			pfree(tab);
! 			if (prev)
! 				cell = lnext(prev);
! 			else
! 				cell = list_head(*vacuum_tables);
! 		}
! 	}
! }
! 
! /*
!  * autovacuum_do_vac_analyze
!  *		Vacuum and/or analyze the specified table
!  */
! static void
! autovacuum_do_vac_analyze(Oid relid, bool dovacuum, bool doanalyze,
! 						  int freeze_min_age)
! {
! 	VacuumStmt	vacstmt;
  
  	/* Set up command parameters */
! 	vacstmt.vacuum = dovacuum;
! 	vacstmt.full = false;
! 	vacstmt.analyze = doanalyze;
! 	vacstmt.freeze_min_age = freeze_min_age;
! 	vacstmt.verbose = false;
! 	vacstmt.relation = NULL;	/* not used since we pass a relids list */
! 	vacstmt.va_cols = NIL;
  
  	/* Let pgstat know what we're doing */
! 	autovac_report_activity(&vacstmt, relid);
  
! 	vacuum(&vacstmt, list_make1_oid(relid), true);
  }
  
  /*
*************** IsAutoVacuumWorkerProcess(void)
*** 1377,1383 ****
  Size
  AutoVacuumShmemSize(void)
  {
! 	return sizeof(AutoVacuumShmemStruct);
  }
  
  /*
--- 2053,2060 ----
  Size
  AutoVacuumShmemSize(void)
  {
! 	return add_size(offsetof(AutoVacuumShmemStruct, av_workers),
!  					mul_size(autovacuum_max_workers, sizeof(WorkerInfo)));
  }
  
  /*
*************** AutoVacuumShmemInit(void)
*** 1400,1404 ****
  	if (found)
  		return;                 /* already initialized */
  
! 	MemSet(AutoVacuumShmem, 0, sizeof(AutoVacuumShmemStruct));
  }
--- 2077,2081 ----
  	if (found)
  		return;                 /* already initialized */
  
! 	MemSet(AutoVacuumShmem, 0, AutoVacuumShmemSize());
  }
Index: src/backend/storage/ipc/procarray.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/ipc/procarray.c,v
retrieving revision 1.21
diff -c -p -r1.21 procarray.c
*** src/backend/storage/ipc/procarray.c	16 Jan 2007 13:28:56 -0000	1.21
--- src/backend/storage/ipc/procarray.c	7 Mar 2007 14:42:20 -0000
***************
*** 36,41 ****
--- 36,42 ----
  #include "access/xact.h"
  #include "access/twophase.h"
  #include "miscadmin.h"
+ #include "postmaster/autovacuum.h"
  #include "storage/procarray.h"
  #include "utils/tqual.h"
  
*************** ProcArrayShmemSize(void)
*** 89,95 ****
  
  	size = offsetof(ProcArrayStruct, procs);
  	size = add_size(size, mul_size(sizeof(PGPROC *),
! 								 add_size(MaxBackends, max_prepared_xacts)));
  
  	return size;
  }
--- 90,98 ----
  
  	size = offsetof(ProcArrayStruct, procs);
  	size = add_size(size, mul_size(sizeof(PGPROC *),
! 								   add_size(add_size(MaxBackends,
! 												 	 max_prepared_xacts),
! 											autovacuum_max_workers)));
  
  	return size;
  }
*************** CreateSharedProcArray(void)
*** 112,118 ****
  		 * We're the first - initialize.
  		 */
  		procArray->numProcs = 0;
! 		procArray->maxProcs = MaxBackends + max_prepared_xacts;
  	}
  }
  
--- 115,121 ----
  		 * We're the first - initialize.
  		 */
  		procArray->numProcs = 0;
! 		procArray->maxProcs = MaxBackends + max_prepared_xacts + autovacuum_max_workers;
  	}
  }
  
Index: src/backend/storage/lmgr/lock.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/storage/lmgr/lock.c,v
retrieving revision 1.176
diff -c -p -r1.176 lock.c
*** src/backend/storage/lmgr/lock.c	1 Feb 2007 19:10:28 -0000	1.176
--- src/backend/storage/lmgr/lock.c	7 Mar 2007 14:42:44 -0000
***************
*** 37,42 ****
--- 37,43 ----
  #include "access/twophase_rmgr.h"
  #include "miscadmin.h"
  #include "pgstat.h"
+ #include "postmaster/autovacuum.h"
  #include "storage/lmgr.h"
  #include "utils/memutils.h"
  #include "utils/ps_status.h"
***************
*** 47,53 ****
  int			max_locks_per_xact; /* set by guc.c */
  
  #define NLOCKENTS() \
! 	mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
  
  
  /*
--- 48,55 ----
  int			max_locks_per_xact; /* set by guc.c */
  
  #define NLOCKENTS() \
! 	mul_size(max_locks_per_xact, \
! 			 add_size(add_size(MaxBackends, max_prepared_xacts), autovacuum_max_workers))
  
  
  /*
Index: src/backend/utils/misc/guc.c
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/backend/utils/misc/guc.c,v
retrieving revision 1.383
diff -c -p -r1.383 guc.c
*** src/backend/utils/misc/guc.c	19 Mar 2007 23:38:30 -0000	1.383
--- src/backend/utils/misc/guc.c	21 Mar 2007 22:05:45 -0000
*************** static struct config_int ConfigureNamesI
*** 1620,1625 ****
--- 1620,1634 ----
  		&autovacuum_freeze_max_age,
  		200000000, 100000000, 2000000000, NULL, NULL
  	},
+ 	{
+ 		/* this is PGC_POSTMASTER because it determines shared memory size */
+ 		{"autovacuum_max_workers", PGC_POSTMASTER, AUTOVACUUM,
+ 			gettext_noop("Sets the maximum number of simultaneously running autovacuum worker processes."),
+ 			NULL
+ 		},
+ 		&autovacuum_max_workers,
+ 		10, 1, INT_MAX, NULL, NULL
+ 	},
  
  	{
  		{"tcp_keepalives_idle", PGC_USERSET, CLIENT_CONN_OTHER,
Index: src/include/postmaster/autovacuum.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/postmaster/autovacuum.h,v
retrieving revision 1.8
diff -c -p -r1.8 autovacuum.h
*** src/include/postmaster/autovacuum.h	15 Feb 2007 23:23:23 -0000	1.8
--- src/include/postmaster/autovacuum.h	7 Mar 2007 14:41:30 -0000
***************
*** 16,21 ****
--- 16,22 ----
  
  /* GUC variables */
  extern bool autovacuum_start_daemon;
+ extern int	autovacuum_max_workers;
  extern int	autovacuum_naptime;
  extern int	autovacuum_vac_thresh;
  extern double autovacuum_vac_scale;
Index: src/include/storage/lwlock.h
===================================================================
RCS file: /home/alvherre/Code/cvs/pgsql/src/include/storage/lwlock.h,v
retrieving revision 1.34
diff -c -p -r1.34 lwlock.h
*** src/include/storage/lwlock.h	15 Feb 2007 23:23:23 -0000	1.34
--- src/include/storage/lwlock.h	14 Mar 2007 16:46:32 -0000
*************** typedef enum LWLockId
*** 62,67 ****
--- 62,68 ----
  	BtreeVacuumLock,
  	AddinShmemInitLock,
  	AutovacuumLock,
+ 	AutovacuumScheduleLock,
  	/* Individual lock IDs end here */
  	FirstBufMappingLock,
  	FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS,
