[RFC] ASOF Join

From: Alexander Kuzmenkov <akuzmenkov(at)timescale(dot)com>
To: pgsql-hackers(at)postgresql(dot)org
Subject: [RFC] ASOF Join
Date: 2021-11-18 14:11:16
Message-ID: CALzhyqwuVz0FJZ-oCYQ9d+yrPrbF5a9HDyAjxuSUdgq8n7nshQ@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi hackers,

There was some interest in implementing ASOF joins in Postgres, see
e.g. this prototype patch by Konstantin Knizhnik:
https://www.postgresql.org/message-id/flat/bc494762-26bd-b100-e1f9-a97901ddad57%40postgrespro.ru
I't like to discuss the possible ways of implementation, if there is
still any interest in that.

Introduction

ASOF join is often used to work with time series data such as stock
quotes or IoT sensors. It is an interpolation where we want to relate
two different time series measured at different points in time. For
each value of the first time series, we take the most recent value of
the second.

Besides an inequality condition on timestamp, such join can also have
equality conditions on other columns. For example, this query joins
two tables that contain bids and asks, finding the most recent task
for each bid for given financial instrument:

```sql
SELECT bids.ts timebid, bid, ask
FROM bids
ASOF JOIN asks ON bid.instrument = ask.instrument
AND ask.ts <= bid.ts;
```

Semantically, this is equivalent to the following correlated subquery:
```sql
SELECT bids.ts timebid, bid, ask
FROM bids,
LATERAL (select * from asks
WHERE asks.instrument = bids.instrument AND asks.ts <= bids.ts
ORDER BY ts DESC LIMIT 1) t;
```
This form is useful to think about which optimizations we can perform
with an ASOF join, how it behaves with respect to other joins, and so
on.

QuestDB has some good docs on this with more examples:
https://questdb.io/docs/reference/sql/join/#asof-join

What Conditions Work with ASOF Join

Conditions for an ASOF join consist of one inequality condition (>=
etc), and optionally a number of equality conditions. All these
conditions must be "mergejoinable" in PG terms -- they must belong to
a btree operator family, which means there is a sorting operator that
corresponds to the condition, which means we can perform a merge join.
They also must support hashing because we'll probably need both
sorting and hashing for implementation (see below). This holds for the
usual data types like numeric. It is natural to think of the
inequality column as "time", but technically it can be any column,
even a string one, w/o changing the algorithm.

Join variants

The purpose of ASOF join is interpolation of one time series to match
another, so it is natural to think of it as an INNER join. The outer
variants might be less useful. Technically, it is easy to extend it to
LEFT ASOF JOIN, where we would output nulls for the right hand columns
if we haven’t yet seen a match. RIGHT and FULL variants also make
sense, but the implementation may be impossible, depending on the
algorithm -- merge and hash joins can support these variants, but the
nested loop cannot.

Use in Combination with Normal Joins

The difference of ASOF join from normal join is that for the
inequality condition, it does not output all the rows that match it,
but only the most recent one. We can think of it as first performing a
normal join and then applying a filter that selects the latest right
hand row. Which row is the "latest" depends on the entire set of rows
that match the join conditions (same as with LIMIT). This means that
the result of ASOF join may depend on the place in the join tree where
it is evaluated, because other joins may remove some rows. Similar to
outer joins, we must respect the user-specified join order for an ASOF
join. It is useful to think about pushing another join below an ASOF
join as pushing a join below a correlated subquery with LIMIT (see
above). This transformation might be correct in some cases, so we
might later think about adding some optimization for join order for
ASOF join.

Proposed Syntax

ASOF join is semantically distinct from a normal join on the same
conditions, so it requires separate grammar. ASOF modifier + listing
all the conditions in the ON section, looks like a good baseline:
`bids ASOF JOIN asks ON asks.timestamp <= bids.timestamp AND
asks.instrument = bids.instrument`

Algorithms

Let's see which algorithm we can use to perform an ASOF join if we
have a "<=" condition on timestamp and several "=" conditions on other
columns (equi-columns).

1. Hash on Equi-keys

This is what ClickHouse uses. It builds a hash table on equi columns,
then for each equi-key builds an array of timestamps, sorted on
demand. This requires bringing the entire right hand table into
memory, so not feasible for large tables.

2. Merge Join on (equi-keys, timestamp) Sorting

This is a natural extension of the merge join algorithm, but instead
of returning all keys for the timestamp column, it returns only the
latest one. A drawback of this algorithm is that the data must be
sorted on timestamp last, so we can't reuse the natural ordering of
the time series data encoded by a (timestamp) index. We will have to
sort both tables entirely in different order, which is prohibitively
costly for large tables. Another way is to create an index on
(equi-keys, timestamp). This would allow us to perform a merge ASOF
join in linear time, but has several drawbacks. First, it requires
maintaining an additional index which costs space and time (the
(timestamp) index we have to have anyway). Second, the time series
data is naturally ordered on timestamp, so even w/o CLUSTER, the
locality in time translates somewhat into the locality in page space.
Reading the table in (equi-keys, timestamp) order would require
essentially random access with frequent switching between chunks, in
contrast to reading in (timestamp) order which reads from a single
chunk. So this algorithm is probably going to be less performant than
the one using (timestamp) sorting, described next. The good part of
this algorithm is that with a dedicated (equi-keys, timestamp) index,
it requires constant memory, so it still can be useful in case of high
cardinality of equi-keys.

3. Merge-Hash on (timestamp) Sorting

If we sort first on timestamp, we can reuse the natural order of
time-series data, often encoded by the index on (timestamp). This
approach would allow us to process data in streaming fashion, w/o
sorting everything again, which makes it feasible for really large
tables. Let's see what algorithm we can use to perform an ASOF join in
this case. Suppose we have left and right input stream sorted on
(timestamp). We will need to use an additional data structure -- a
hash table indexed by the equi keys. The algorithm is as follows:

a. For a given left row, advance the right table until right timestamp
> left timestamp.

b. While we advance the right table, put each right hand row into the
hash table indexed by the equi keys. Overwrite the previous row with
the same keys, if there was any.

c. We have finished advancing the right table. The hash table now
contains the most recent right hand row for every value of equi-keys.
Most recent because the right hand table is sorted by (timestamp).

d. For the left row, look up a right row that matches it by the equi
keys in the hash table. This is the right hand row that matches the
ASOF join conditions (equi-keys are equal, left timestamp >= right
timestamp, right timestamp is maximal for the given equi-keys). Output
the result.

e. Go to the next left row. The left table is also sorted on
(timestamp), so we won't need to rewind the right table, only to
advance it forward.

Given the sorted input paths, this algorithm is linear time in size of
the tables. A drawback of this algorithm is that it requires memory
proportional to the cardinality of the equi-columns. A possible
optimization is to split the equi-key hash table into hot and cold
parts by LRU, and dump the cold part to disk. This would help if each
equi-key only occurs for a small period of time.

4. Nested Loop

An efficient nested loop plan has to have a fast right-side subplan,
such as an index lookup. Unfortunately, there seems to be no way to
efficiently perform a last-point lookup for given equi-keys, if we
have separate btree indexes on timestamp and equi-keys. The nested
loop plan could work if we have a (timestamp, equi-keys) btree index.

Prototype Implementation

For a prototype, I'd go with #3 "merge-something with a hash table of
most recent rows for equi-keys", because it works for big tables and
can reuse the physical data ordering.

I'll be glad to hear your thoughts on this.

--
Alexander Kuzmenkov
Timescale

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Robert Haas 2021-11-18 14:17:50 Re: Should rename "startup process" to something else?
Previous Message Tomas Vondra 2021-11-18 14:07:39 Re: Propose a new hook for mutating the query bounds