Re: BitmapHeapScan streaming read user and prelim refactoring

From: James Hunter <james(dot)hunter(dot)pg(at)gmail(dot)com>
To: Thomas Munro <thomas(dot)munro(at)gmail(dot)com>
Cc: Melanie Plageman <melanieplageman(at)gmail(dot)com>, Tomas Vondra <tomas(at)vondra(dot)me>, Nazir Bilal Yavuz <byavuz81(at)gmail(dot)com>, Dilip Kumar <dilipbalaut(at)gmail(dot)com>, Heikki Linnakangas <hlinnaka(at)iki(dot)fi>, Andres Freund <andres(at)anarazel(dot)de>, Pg Hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: BitmapHeapScan streaming read user and prelim refactoring
Date: 2025-04-14 16:58:19
Message-ID: CAJVSvF4k_sP7gv+H0bCyQfV9EE9Dmw5wfFDZ-V_mZPxFpNbJQw@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Apr 10, 2025 at 8:15 PM Thomas Munro <thomas(dot)munro(at)gmail(dot)com> wrote:
>
> On Fri, Apr 11, 2025 at 5:50 AM James Hunter <james(dot)hunter(dot)pg(at)gmail(dot)com> wrote:
> > I am looking at the pre-streaming code, in PG 17, as I am not familiar
> > with the PG 18 "streaming" code. Back in PG 17, nodeBitmapHeapscan.c
> > maintained two shared TBM iterators, for PQ. One of the iterators was
> > the actual, "fetch" iterator; the other was the "prefetch" iterator,
> > which kept some distance ahead of the "fetch" iterator (to hide read
> > latency).
>
> We're talking at cross-purposes.
>
> The new streaming BHS isn't just issuing probabilistic hints about
> future access obtained from a second iterator. It has just one shared
> iterator connected up to the workers' ReadStreams. Each worker pulls
> a disjoint set of blocks out of its stream, possibly running a bunch
> of IOs in the background as required. The stream replaces the old
> ReadBuffer() call, and the old PrefetchBuffer() call and a bunch of
> dubious iterator synchronisation logic are deleted.

Thanks for the clarification -- I realize I am very late to this
conversation (as I rejoined a PostgreSQL-related team only within the
past year), but I think this whole "ReadStream" project is misguided.

Of course, late feedback is a burden, but I think our discussion here
(and, in the future, if you try to "ReadStream" BTree index pages,
themselves) illustrates my point. In this specific case, you are
proposing a lot of complexity, but it's not at all clear to me: why?

I see two orthogonal problems, in processing Bitmap Heap pages in
parallel: (1) we need to prefetch enough pages, far enough in advance,
to hide read latency; (2) later, every parallel worker needs to be
given a set of pages to process, in a way that minimizes contention.

The easiest way to hand out work to parallel workers (and often the
best) is to maintain a single, shared, global work queue. Just put
whatever pages you prefetch into a FIFO queue, and let each worker
pull one piece of "work" off that queue. In this was, there's no
"ramp-down" problem.

If you find that contention on this shared queue becomes a bottleneck,
then you just pull *n* pieces of work, in a batch. Then the
"ramp-down" problem is limited to "n", instead of just 1. Often, one
can find a suitable value of n that simultaneously makes contention
effectively zero, while avoiding "ramp-down" problems; say n = 10.

So much for popping from the shared queue. Pushing to the shared queue
is also easy, because you have async reads. Anytime a worker pops a
(batch of) work item(s) off the shared queue, it checks to see if the
queue is still large enough. If not, it issues the appropriate
prefetch / "ReadStream" calls.

A single shared queue is easiest, but sometimes there's no way to
prevent it from becoming a bottleneck. In that case, one typically
partitions the input at startup, gives each worker its own partition,
and waits for all workers to complete. In this, second, model, workers
are entirely independent, so there is no contention: we scale out
perfectly. The problem, as you've pointed out, is that one worker
might finish its own work long before another; and then the worker
that finished its work is idle and therefore wasted.

This is why a single shared queue is so nice, because it avoids
workers being idle. But I am confused by your proposal, which seems to
be trying to get the behavior of a single shared queue, but
implemented with the added complexity of multiple queues.

Why not just use a single queue?

> These are now
> real IOs running in the background and for the *exact* blocks you will
> consume; posix_fadvise() was just a stepping towards AIO that
> tolerated sloppy synchronisation including being entirely wrong.

It has never been clear to me why prefetching the exact blocks you'll
later consume is seen as a *benefit*, rather than a *cost*. I'm not
aware of any prefetch interface, other than PG's "ReadStream," that
insists on this. But that's a separate discussion...

> If
> you additionally teach the iterator to work in batches, as my 0001
> patch (which I didn't propose for v18) showed, then one worker might
> end up processing (say) 10 blocks at end-of-scan while all the other
> workers have finished the node, and maybe the whole query.

The standard solution to the problem you describe here is to pick a
batch size small enough that you don't care. For example, no matter
what you do, it will always be possible for one worker to end up
processing *1* extra block, end of scan. If 1 is OK, but 10 is too
large, then I would try 5. AFAIU, the argument for larger batches is
to reduce contention; and the argument for smaller batches is to
reduce amount of time workers are idle, at end of query. The problem
seems amenable to a static solution -- there should be a value of n
that satisfies both restrictions.

Otherwise, we'd implicitly be saying that contention is very slow,
compared to the time it takes a worker to process a block; while also
saying that it takes workers a long time to process each block.

For example, if you figure 10 us to process each block, then a batch
of size n = 10 gives each worker 100 us of CPU time, between when it
needs to acquire the global mutex, to get the next batch of work. Does
this make contention low enough? And then, if you figure that a
parallel query has to run >= 100 ms, to make PQ worthwhile, then the
"idle" time is at most 100 us / 100 ms = 0.1 percent, and therefore
inconsequential.

> That'd be
> unfair. "Ramp-down" ... 8, 4, 2, 1 has been used in one or two other
> places in parallel-aware nodes with internal batching as a kind of
> fudge to help them finish CPU work around the same time if you're
> lucky, and my 0002 patch shows that NOT working here. I suspect the
> concept itself is defunct: it no longer narrows the CPU work
> completion time range across workers at all well due to the elastic
> streams sitting in between.

I think you are arguing that a single shared queue won't work, but
it's not clear to me why it won't. Maybe the problem is these elastic
streams?

What I don't understand is -- and I apologize again for interjecting
myself into this conversation so late -- why do you want or need
multiple streams? Logically, every PG scan is a single stream of
blocks, and parallel workers just pull batches of work off that single
stream.

Suppose the "ReadStream" order for a Bitmap Heap Scan is blocks: 0, 2,
1, 3, 4, 6, 5, 7, 8, 10, 9, 11, ... Why would we want or need to
affinitize reads to a particular worker? These are the blocks to be
read; this is the order in which they need to be read. Any worker can
issue any read, because we are using async I/O.

> Any naive solution that requires
> cooperation/waiting for another worker to hand over final scraps of
> work originally allocated to it (and I don't mean the IO completion
> part, that all just works just fine as you say, a lot of engineering
> went into the buffer manager to make that true, for AIO but also in
> the preceding decades...

You have lost me here. Every fork() / join() algorithm requires
waiting for a worker to hand over the final scraps of work allocated
to it, and AFAIK, no one has ever worried about deadlock risk from
fork() / join()...

> what I mean here is: how do you even know
> which block to read?) is probably a deadlock risk. Essays have been
> written on the topic if you are interested.

I think this may be where we are talking at cross purposes. Well,
reading a block is an async operation, right? And a parallel Bitmap
Heap Scan visits exactly the same pages as its serial variant.

Every worker acquires the shared queue's mutex. It pops n blocks off
the shared queue. If the shared queue is now too small, it iterates
the "ReadStream" to get *m* more page IDs, issues async reads for
those pages, and pushes them on the shared queue. The worker then
releases the mutex.

If by "read" you mean, "issue an async read," then you know which
block to read by visiting the (shared) "ReadStream".

if, instead, you mean, "process a block," then you know because you
popped it off the shared queue.

> All the rest of our conversation makes no sense without that context :-)

I think I am still missing something, but I am not sure what. For
example, I can't see how deadlock would be a problem...

> > > I admit this all sounds kinda complicated and maybe there is a much
> > > simpler way to achieve the twin goals of maximising I/O combining AND
> > > parallel query fairness.
> >
> > I tend to think that the two goals are so much in conflict, that it's
> > not worth trying to apply cleverness to get them to agree on things...
>
> I don't give up so easily :-)

Now that I think about the problem more -- "maximizing I/O combining"
is orthogonal to "parallel query fairness," right?

Parallel query fairness involves splitting up CPU via small batches.
Maximizing I/O combining involves making I/O more efficient, by
issuing large batches.

However, there's no reason I can see why the *I/O batch* needs to
resemble the *CPU batch*. If maximizing I/O combining leads me to
issue a batch of 1,000 reads at a time, then so what? These are all
async reads! The "unlucky" worker issues its 1,000 reads, then gets on
with its life.

If parallel query fairness leads me to *process* a batch of 10 blocks
at a time -- then, again, so what? CPU and I/O are independent
resources.

Why not just prefetch / read-stream 1,000 blocks at a time, while
processing (using CPU) 10 blocks at a time? Wouldn't that achieve your
twin goals, without requiring multiple queues / streams / etc.?

Thanks,
James

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Kirill Reshke 2025-04-14 17:02:10 Re: HELP: SAVEPOINT feature cases
Previous Message Andres Freund 2025-04-14 16:42:02 Re: [PoC] Federated Authn/z with OAUTHBEARER