From: | Hubert Zhang <hzhang(at)pivotal(dot)io> |
---|---|
To: | Robert Haas <robertmhaas(at)gmail(dot)com> |
Cc: | Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Melanie Plageman <melanieplageman(at)gmail(dot)com>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, hackers <pgsql-hackers(at)postgresql(dot)org> |
Subject: | Re: accounting for memory used for BufFile during hash joins |
Date: | 2019-08-14 10:30:26 |
Message-ID: | CAB0yremvswRAT86Afb9MZ_PaLHyY9BT313-adCHbhMJ=x_GEcg@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
On Fri, Jul 12, 2019 at 1:16 AM Robert Haas <robertmhaas(at)gmail(dot)com> wrote:
> On Mon, May 6, 2019 at 9:49 PM Thomas Munro <thomas(dot)munro(at)gmail(dot)com>
> wrote:
> > Stepping back a bit, I think there is something fishy about the way we
> > detect extreme skew. Is that a factor in this case? Right now we
> > wait until we have a batch that gets split into child batches
> > containing exactly 0% and 100% of the tuples before we give up.
> > Previously I had thought of that as merely a waste of time, but
> > clearly it's also a waste of unmetered memory. Oops.
> >
> > I think our extreme skew detector should go off sooner, because
> > otherwise if you have N nicely distributed unique keys and also M
> > duplicates of one bad egg key that'll never fit in memory, we keep
> > repartitioning until none of the N keys fall into the batch containing
> > the key for the M duplicates before we give up! You can use
> > balls-into-bins maths to figure out the number, but I think that means
> > we expect to keep splitting until we have N * some_constant batches,
> > and that's just silly and liable to create massive numbers of
> > partitions proportional to N, even though we're trying to solve a
> > problem with M. In another thread I suggested we should stop when
> > (say) 95% of the tuples go to one child batch. I'm not sure how you
> > pick the number.
>
> Another thing that is fishy about this is that we can't split a batch
> or a bucket without splitting them all. Let's say that nbatches *
> nbuckets = 16 million. One bucket in one batch contains 90% of the
> tuples. Splitting *that* bucket might be a good idea if only 5% of the
> tuples end up moving, perhaps even if only 1% end up moving. But, if
> you have to double the total number of batches to get that benefit,
> it's a lot less compelling, because now you have to rescan the outer
> side more times.
It seems to me that a good chunk of what's being proposed right now
> basically ignores the fact that we're not really responding to the
> skew in a very effective way. Thomas wants to stop splitting all the
> buckets when splitting one of the buckets produces only a very small
> benefit rather than when it produces no benefit at all, but he's not
> asking why we're splitting all of the buckets in the first place.
> Tomas wants to slice the array of batches because there are so many of
> them, but why are there so many? As he said himself, "it gets to that
> many batches because some of the values are very common and we don't
> disable the growth earlier." Realistically, I don't see how there can
> be so many batches that we can't even fit the metadata about the
> matches into memory unless we're unnecessarily creating a lot of
> little tiny batches that we don't really need.
>
>
+1 on Robert's suggestion. It's worth to find the root cause of the batch
explosion problem.
As Robert pointed out "we can't split a batch without spilling them all".
In fact, the hybrid hash join algorithm should only split the overflow
batch and avoid to split the small batch which could be processed in
memory. Planner should calculate the initial batch number which ensure the
average size batch could be processed in memory giving different data
distribution. Executor should spilt skew batch in an one-batch-a-time
manner.
I will firstly show an example to help understand batch explosion problem.
Suppose we are going to join R and S and planner calculate the initial
nbatch as 4.
In the first batch run, during HJ_BUILD_HASHTABLE state we Scan R and build
in memory hash table for batch1 and spill other tuples of R into different
batch files(R2-R4). During HJ_NEED_NEW_OUTER and HJ_SCAN_BUCKET state, we
do two things: 1. if tuple in S belong to current batch, match it with in
memory R1 and emit result to parent plan node; 2. if tuple in S doesn't
belong to current batch, spill it to batch files of S2-S4. As a result,
after the first batch run we get:
6 disk files: batch2(R2,S2), batch3(R3,S3) batch4(R4,S4)
Now we run into HJ_NEED_NEW_BATCH state and begin to process R2 and S2.
Suppose the second batch R2 is skewed and need to split batch number to 8.
When building in-memory hash table for R2, we also split some tuples in R2
into spill file R6.(Based on our hash function, tuples belong to R2 will
not be shuffled to batches except R6). After R2's hash table is built, we
begin to probe tuples in S2. Since batch number is changed from 4 to 8,
some of tuples in S2 now belong to S6 and we spilt them to disk file S6.
For other tuples in S2, we match them with R2 and output the result to
parent plannode. After the second batch processed, we got:
disk files: batch3(R3,S3), batch4(R4,S4),batch(R6,S6)
Next, we begin to process R3 and S3. The third batch R3 is not skewed, but
since our hash function depends on batch number, which is 8 now. So we have
to split some tuples in R3 to disk file R7, *which is not necessary*. When
Probing S3, we also need to spilt some tuples in S3 into S7, *which is not
necessary either*. Since R3 could be loaded into memory entirely, spill
part of R3 to disk file not only introduce more file and file buffers(which
is problem Tomas try to solve), but also slow down the performance. After
the third batch processed, we got:
disk files: batch4(R4,S4),batch(R6,S6),batch(R7,S7)
Next, we begin to process R4 and S4. Similar to R3, some tuples in R4 also
need to be spilled to file R8. But after this splitting, suppose R4 is
still skewed, and we increase the batch number again to 16. As a result,
some tuples in R4 will be spilled to file R12 and R16. When probing S4,
similarly we need to split some tuples in S4 into S8,S12 and S16. After
this step, we get:
disk files:
batch(R6,S6),batch(R7,S7),batch(R8,S8),batch(R12,S12),batch(R16,S16).
Next, when we begin to process R6 and S6, even if we could build hash table
for R6 all in memory, but we have to spilt R6 based on new batch number 16
and spill to file: R14. *It's not necessary.*
Now we could conclude that increasing batch number would introduce
unnecessary repeated spill not only on original batch(R3,S3) but also on
new generated batch(R6,S6) in a cascade way. *In a worse case, suppose R2
is super skew and need to split 10 times, while R3 is OK to build hash
table all in memory. In this case, we have to introduce R7,R11,....,R4095,
total 1023 unnecessary spill files. Each of these files may only contain
less than ten tuples. Also, we need to palloc file buffer(512KB) for these
spill files. This is the so called batch explosion problem.*
*Solutions:*
To avoid these unnecessary repeated spill, I propose to make function
ExecHashGetBucketAndBatch
as a hash function chain to determine the batchno.
Here is the original implementation of ExecHashGetBucketAndBatch
```
//nbatch is the global batch number
*batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);
```
We can see the original hash function basically calculate MOD of global
batch number(IBN).
A real hybrid hash join should use a hash function chain to determine the
batchno. In the new algorithm, the component of hash function chain is
defined as: MOD of #IBN, MOD of #IBN*2, MOD of #IBN*4,MOD of #IBN*8
....etc. A small batch will just use the first hash function in chain,
while the skew batch will use the same number of hash functions in chain as
the times it is split.
Here is the new implementation of ExecHashGetBucketAndBatch
```
/* i is the current batchno we are processing */
/* hashChainLen record the times batch i is spilt */
for (j=0;j<hashChainLen[i];j++)
{
batchno = (hashvalue >> hashtable->log2_nbuckets) & ((#initialBatch)*
(2^j) - 1);
/* if the calculated batchno is still i, we need to call more hash
functions
* in chain to determine the final bucketno, else we could return
directly.
*/
if ( batchno != i )
return batchno;
}
return batchno;
```
A quick example, Suppose R3's input is 3,7,11,15,19,23,27,31,35,15,27(we
could ensure they MOD4=3)
Suppose Initial batch number is 4 and memory could contain 4 tuples, the
5th tuple need to do batch spilt.
Step1: batch3 process 3,7,11,15,19 and now need to split,
chainLen[3]=2
batch3: 3,11,19
batch7: 7,15
Step2: 23,27,31 coming
batch3: 3,11,19,27
batch7: 7,15,23,31
Step 3: 35 coming, batch3 need to split again
chainLen[3]=3
batch3: 3,19,35
batch7: 7,15,23,31
batch11: 11,27
Step 4 15 coming, HashFun1 15%4=3, HashFun2 15%8=7;
since 7!=3 spill 15 to batch7.
Step 5 27 coming, 27%4=3, 27%8=3, 27%16 =11
since 27!=3 spill 27 to batch 11.
Final state:
chainLen[3]=3
batch3: 3,19,35
batch7: 7,15,23,31,15
batch11: 11,27,27
Here is pseudo code of processing of batch i:
```
/*Step 1: build hash table for Ri*/
tuple = ReadFromFile(Ri);
/* get batchno by the new function*/
batchno =NewExecHashGetBucketAndBatch()
/* do spill if not belong to current batch*/
if(batchno != i)
spill to file[batchno]
flag = InsertTupleToHashTable(HT, tuple);
if (flag == NEED_SPILT)
{
hashChainLen[i] ++;
/* then call ExecHashIncreaseNumBatches() to do the real spill */
}
/* probe stage */
tuple = ReadFromFile(S[i+Bi*k]);
batchno = NewExecHashGetBucketAndBatch()
if (batchno == curbatch)
probe and match
else
spillToFile(tuple, batchno)
}
```
This solution only split the batch which needs to be split in a lazy way.
If this solution makes sense, I would like write the real patch.
Any comment?
--
Thanks
Hubert Zhang
From | Date | Subject | |
---|---|---|---|
Next Message | Antonin Houska | 2019-08-14 14:36:35 | Re: [Proposal] Table-level Transparent Data Encryption (TDE) and Key Management Service (KMS) |
Previous Message | Ashutosh Sharma | 2019-08-14 09:50:57 | Re: Zedstore - compressed in-core columnar storage |