Re: Avoiding hash join batch explosions with extreme skew and weird stats

From: Alena Rybakina <a(dot)rybakina(at)postgrespro(dot)ru>
To: Melanie Plageman <melanieplageman(at)gmail(dot)com>
Cc: Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Jeff Davis <pgsql(at)j-davis(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Jesse Zhang <sbjesse(at)gmail(dot)com>, Heikki Linnakangas <hlinnaka(at)iki(dot)fi>, david(dot)g(dot)kimura(at)gmail(dot)com, soumyadeep2007(at)gmail(dot)com
Subject: Re: Avoiding hash join batch explosions with extreme skew and weird stats
Date: 2024-11-10 17:55:17
Message-ID: 29e4977d-6b2b-4910-ab38-dc64babd732b@postgrespro.ru
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi!

Thank you for your work on this problem!

On 01.09.2020 01:13, Melanie Plageman wrote:
> Attached is the current version of adaptive hash join with two
> significant changes as compared to v10:
>
> 1) Implements spilling of batch 0 for parallel-aware parallel hash join.
> 2) Moves "striping" of fallback batches from "build" to "load" stage
> It includes several smaller changes as well.
>
> Batch 0 spilling is necessary when the hash table for batch 0 cannot fit
> in memory and allows us to use the "hashloop" strategy for batch 0.
>
> Spilling of batch 0 necessitated the addition of a few new pieces of
> code. The most noticeable one is probably the hash table eviction phase
> machine. If batch 0 was marked as a "fallback" batch in
> ExecParallelHashIncreaseNumBatches() PHJ_GROW_BATCHES_DECIDING phase,
> any future attempt to insert a tuple that would exceed the space_allowed
> triggers eviction of the hash table.
> ExecParallelHashTableEvictBatch0() will evict all batch 0 tuples in
> memory into spill files in a batch 0 inner SharedTuplestore.
>
> This means that when repartitioning batch 0 in the future, both the
> batch 0 spill file and the hash table need to be drained and relocated
> into the new generation of batches and the hash table. If enough memory
> is freed up from batch 0 tuples relocating to other batches, then it is
> possible that tuples from the batch 0 spill files will go back into the
> hash table.
> After batch 0 is evicted, the build stage proceeds as normal.
>
> The main alternative to this design that we considered was to "close" the
> hash table after it is full. That is, if batch 0 has been marked to fall
> back, once it is full, all subsequent tuples pulled from the outer child
> would bypass the hash table altogether and go directly into a spill
> file.
>
> We chose the hash table eviction route because I thought it might be
> better to write chunks of the hashtable into a file together rather than
> sporadically write new batch 0 tuples to spill files as they are
> pulled out of the child node. However, since the same sts_puttuple() API
> is used in both cases, it is highly possible this won't actually matter
> and we will do the same amount of I/O.
> Both designs involved changing the flow of the code for inserting and
> repartitioning tuples, so I figured that I would choose one, do some
> testing, and try the other one later after more discussion and review.
>
> This patch also introduces a significant change to how tuples are split
> into stripes. Previously, during the build stage, tuples were written to
> spill files in the SharedTuplestore with a stripe number in the metadata
> section of the MinimalTuple.
> For a batch that had been designated a "fallback" batch,
> once the space_allowed had been exhausted, the shared stripe number
> would be incremented and the new stripe number was written in the tuple
> metadata to the files. Then, during loading, tuples were only loaded
> into the hashtable if their stripe number matched the current stripe
> number.
>
> This had several downsides. It introduced a couple new shared variables --
> the current stripe number for the batch and its size.
> In master, during the normal mode of the "build" stage, shared variables
> for the size or estimated_size of the batch are checked on each
> allocation of a STS Chunk or HashMemoryChunk, however, during
> repartitioning, because bailing out early was not an option, workers
> could use backend-local variables to keep track of size and merge them
> at the end of repartitioning. This wasn't possible if we needed accurate
> stripe numbers written into the tuples. This meant that we had to add
> new shared variable accesses to repartitioning.
>
> To avoid this, Deep and I worked on moving the "striping" logic from the
> "build" stage to the "load" stage for batches. Serial hash join already
> did striping in this way. This patch now pauses loading once the
> space_allowed has been exhausted for parallel hash join as well. The
> tricky part was keeping track of multiple read_pages for a given file.
>
> When tuples had explicit stripe numbers, we simply rewound the read_page
> in the SharedTuplestoreParticipant to the earliest SharedTuplestoreChunk
> that anyone had read and relied on the stripe numbers to avoid loading
> tuples more than once. Now, each worker participating in reading from
> the SharedTuplestore could have received a read_page "assignment" (four
> blocks, currently) and then failed to allocate a HashMemoryChunk. We
> cannot risk rewinding the read_page because there could be
> SharedTuplestoreChunks that have already been loaded in between ones
> that have not.
>
> The design we went with was to "overflow" the tuples from this
> SharedTuplestoreChunk onto the end of the write_file which this worker
> wrote--if it participated in writing this STS--or by making a new
> write_file if it did not participate in writing. This entailed keeping
> track of who participated in the write phase. SharedTuplestore
> participation now has three "modes"-- reading, writing, and appending.
> During appending, workers can write to their own file and read from any
> file.
>
> One of the alternative designs I considered was to store the offset and
> length of leftover blocks that still needed to be loaded into the hash
> table in the SharedTuplestoreParticipant data structure. Then, workers
> would pick up these "assignments". It is basically a
> SharedTuplestoreParticipant work queue.
> The main stumbling block I faced here was allocating a variable number of
> things in shared memory. You don't know how many read participants will
> read from the file and how many stripes there will be (until you've
> loaded the file). In the worst case, you would need space for
> nparticipants * nstripes - 1 offset/length combos.
> Since I don't know how many stripes I have until I've loaded the file, I
> can't allocate shared memory for this up front.
>
> The downside of the "append overflow" design is that, currently, all
> workers participating in loading a fallback batch write an overflow
> chunk for every fallback stripe.
> It seems like something could be done to check if there is space in the
> hashtable before accepting an assignment of blocks to read from the
> SharedTuplestore and moving the shared variable read_page. It might
> reduce instances in which workers have to overflow. However, I tried
> this and it is very intrusive on the SharedTuplestore API (it would have
> to know about the hash table). Also, oversized tuples would not be
> addressed by this pre-assignment check since memory is allocated a
> HashMemoryChunk at a time. So, even if this was solved, you would need
> overflow functionality
>
> One note is that I had to comment out a test in join_hash.sql which
> inserts tuples larger than work_mem in size (each), because it no longer
> successfully executes.
> Also, the stripe number is not deterministic, so sometimes the tests that
> compare fallback batches' number of stripes fail (also in join_hash.sql).
>
> Major outstanding TODOs:
> --
> - Potential redesign of stripe loading pausing and resumption
> - The instrumentation for parallel fallback batches has some problems
> - Deadlock hazard avoidance design of the stripe barrier still needs work
> - Assorted smaller TODOs in the code
I think this patch is essential and will save us from allocating an
incredibly large amount of memory when doing a hash join.
Unfortunately, we are not yet able to avoid the problems of incorrect
cardinality estimation and clearly estimate NULL elements, which can
lead to a large growth of the batch in the hash table. Recently, my
client just faced this problem - his system was allocated 50 GB of
memory when performing a hash join and only your patch helped to avoid
this. Luckily for me, he was using version 15 of Postgres, but his case
is reproduced with the same problem in newer versions.

I noticed that your patch is far behind the master and made an attempt
to rebase it to try to revive the discussion. But I'm stuck with a
problem of initialization of the cluster and I could not solve. Can you
take a look and tell me what is wrong here?

export CDIR=$(pwd)
export PGDATA=/home/alena/postgres_data11
my/inst/bin/pg_ctl -D $PGDATA -l logfile stop
rm -r $PGDATA
mkdir $PGDATA
my/inst/bin/initdb -D $PGDATA >> log.txt
my/inst/bin/pg_ctl -D $PGDATA -l logfile start

pg_ctl: directory "/home/alena/postgres_data11" is not a database
cluster directory
2024-11-10 20:44:40.598 MSK [20213] FATAL:  duplicate key value violates
unique constraint "pg_description_o_c_o_index"
2024-11-10 20:44:40.598 MSK [20213] DETAIL:  Key (objoid, classoid,
objsubid)=(378, 1255, 0) already exists.
2024-11-10 20:44:40.598 MSK [20213] STATEMENT:
        WITH funcdescs AS ( SELECT p.oid as p_oid, o.oid as o_oid,
oprname FROM pg_proc p JOIN pg_operator o ON oprcode = p.oid ) INSERT
INTO pg_description   SELECT p_oid, 'pg_proc'::regclass, 0,    
'implementation of ' || oprname || ' operator'   FROM funcdescs   WHERE
NOT EXISTS (SELECT 1 FROM pg_description    WHERE objoid = p_oid AND
classoid = 'pg_proc'::regclass)   AND NOT EXISTS (SELECT 1 FROM
pg_description    WHERE objoid = o_oid AND classoid =
'pg_operator'::regclass         AND description LIKE 'deprecated%');

child process exited with exit code 1

--
Regards,
Alena Rybakina
Postgres Professional

Attachment Content-Type Size
skew_data.diff text/x-patch 371.1 KB

In response to

Browse pgsql-hackers by date

  From Date Subject
Next Message Pavel Stehule 2024-11-10 18:04:34 Re: proposal: schema variables
Previous Message Pavel Stehule 2024-11-10 17:51:40 Re: proposal: schema variables