Re: Re: parallel distinct union and aggregate support patch

From: Dilip Kumar <dilipbalaut(at)gmail(dot)com>
To: "bucoo(at)sohu(dot)com" <bucoo(at)sohu(dot)com>
Cc: Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, tgl <tgl(at)sss(dot)pgh(dot)pa(dot)us>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Re: parallel distinct union and aggregate support patch
Date: 2020-11-03 12:36:55
Message-ID: CAFiTN-sbLaQUhnj8fiekWePC2yeAf60+YvMDGwWN6konahh2YA@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Thu, Oct 29, 2020 at 12:53 PM bucoo(at)sohu(dot)com <bucoo(at)sohu(dot)com> wrote:
>
> > 1) It's better to always include the whole patch series - including the
> > parts that have not changed. Otherwise people have to scavenge the
> > thread and search for all the pieces, which may be a source of issues.
> > Also, it confuses the patch tester [1] which tries to apply patches from
> > a single message, so it will fail for this one.
> Pathes 3 and 4 do not rely on 1 and 2 in code.
> But, it will fail when you apply the apatches 3 and 4 directly, because
> they are written after 1 and 2.
> I can generate a new single patch if you need.
>
> > 2) I suggest you try to describe the goal of these patches, using some
> > example queries, explain output etc. Right now the reviewers have to
> > reverse engineer the patches and deduce what the intention was, which
> > may be causing unnecessary confusion etc. If this was my patch, I'd try
> > to create a couple examples (CREATE TABLE + SELECT + EXPLAIN) showing
> > how the patch changes the query plan, showing speedup etc.
> I written some example queries in to regress, include "unique" "union"
> "group by" and "group by grouping sets".
> here is my tests, they are not in regress
> ```sql
> begin;
> create table gtest(id integer, txt text);
> insert into gtest select t1.id,'txt'||t1.id from (select generate_series(1,1000*1000) id) t1,(select generate_series(1,10) id) t2;
> analyze gtest;
> commit;
> set jit = off;
> \timing on
> ```
> normal aggregate times
> ```
> set enable_batch_hashagg = off;
> explain (costs off,analyze,verbose)
> select sum(id),txt from gtest group by txt;
> QUERY PLAN
> -------------------------------------------------------------------------------------------------------------
> Finalize GroupAggregate (actual time=6469.279..8947.024 rows=1000000 loops=1)
> Output: sum(id), txt
> Group Key: gtest.txt
> -> Gather Merge (actual time=6469.245..8165.930 rows=1000058 loops=1)
> Output: txt, (PARTIAL sum(id))
> Workers Planned: 2
> Workers Launched: 2
> -> Sort (actual time=6356.471..7133.832 rows=333353 loops=3)
> Output: txt, (PARTIAL sum(id))
> Sort Key: gtest.txt
> Sort Method: external merge Disk: 11608kB
> Worker 0: actual time=6447.665..7349.431 rows=317512 loops=1
> Sort Method: external merge Disk: 10576kB
> Worker 1: actual time=6302.882..7061.157 rows=333301 loops=1
> Sort Method: external merge Disk: 11112kB
> -> Partial HashAggregate (actual time=2591.487..4430.437 rows=333353 loops=3)
> Output: txt, PARTIAL sum(id)
> Group Key: gtest.txt
> Batches: 17 Memory Usage: 4241kB Disk Usage: 113152kB
> Worker 0: actual time=2584.345..4486.407 rows=317512 loops=1
> Batches: 17 Memory Usage: 4241kB Disk Usage: 101392kB
> Worker 1: actual time=2584.369..4393.244 rows=333301 loops=1
> Batches: 17 Memory Usage: 4241kB Disk Usage: 112832kB
> -> Parallel Seq Scan on public.gtest (actual time=0.691..603.990 rows=3333333 loops=3)
> Output: id, txt
> Worker 0: actual time=0.104..607.146 rows=3174970 loops=1
> Worker 1: actual time=0.100..603.951 rows=3332785 loops=1
> Planning Time: 0.226 ms
> Execution Time: 9021.058 ms
> (29 rows)
>
> Time: 9022.251 ms (00:09.022)
>
> set enable_batch_hashagg = on;
> explain (costs off,analyze,verbose)
> select sum(id),txt from gtest group by txt;
> QUERY PLAN
> -------------------------------------------------------------------------------------------------
> Gather (actual time=3116.666..5740.826 rows=1000000 loops=1)
> Output: (sum(id)), txt
> Workers Planned: 2
> Workers Launched: 2
> -> Parallel BatchHashAggregate (actual time=3103.181..5464.948 rows=333333 loops=3)
> Output: sum(id), txt
> Group Key: gtest.txt
> Worker 0: actual time=3094.550..5486.992 rows=326082 loops=1
> Worker 1: actual time=3099.562..5480.111 rows=324729 loops=1
> -> Parallel Seq Scan on public.gtest (actual time=0.791..656.601 rows=3333333 loops=3)
> Output: id, txt
> Worker 0: actual time=0.080..646.053 rows=3057680 loops=1
> Worker 1: actual time=0.070..662.754 rows=3034370 loops=1
> Planning Time: 0.243 ms
> Execution Time: 5788.981 ms
> (15 rows)
>
> Time: 5790.143 ms (00:05.790)
> ```
>
> grouping sets times
> ```
> set enable_batch_hashagg = off;
> explain (costs off,analyze,verbose)
> select sum(id),txt from gtest group by grouping sets(id,txt,());
> QUERY PLAN
> ------------------------------------------------------------------------------------------
> GroupAggregate (actual time=9454.707..38921.885 rows=2000001 loops=1)
> Output: sum(id), txt, id
> Group Key: gtest.id
> Group Key: ()
> Sort Key: gtest.txt
> Group Key: gtest.txt
> -> Sort (actual time=9454.679..11804.071 rows=10000000 loops=1)
> Output: txt, id
> Sort Key: gtest.id
> Sort Method: external merge Disk: 254056kB
> -> Seq Scan on public.gtest (actual time=2.250..2419.031 rows=10000000 loops=1)
> Output: txt, id
> Planning Time: 0.230 ms
> Execution Time: 39203.883 ms
> (14 rows)
>
> Time: 39205.339 ms (00:39.205)
>
> set enable_batch_hashagg = on;
> explain (costs off,analyze,verbose)
> select sum(id),txt from gtest group by grouping sets(id,txt,());
> QUERY PLAN
> -------------------------------------------------------------------------------------------------
> Gather (actual time=5931.776..14353.957 rows=2000001 loops=1)
> Output: (sum(id)), txt, id
> Workers Planned: 2
> Workers Launched: 2
> -> Parallel BatchHashAggregate (actual time=5920.963..13897.852 rows=666667 loops=3)
> Output: sum(id), txt, id
> Group Key: gtest.id
> Group Key: ()
> Group Key: gtest.txt
> Worker 0: actual time=5916.370..14062.461 rows=513810 loops=1
> Worker 1: actual time=5916.037..13932.847 rows=775901 loops=1
> -> Parallel Seq Scan on public.gtest (actual time=0.399..688.273 rows=3333333 loops=3)
> Output: id, txt
> Worker 0: actual time=0.052..690.955 rows=3349990 loops=1
> Worker 1: actual time=0.050..691.595 rows=3297070 loops=1
> Planning Time: 0.157 ms
> Execution Time: 14598.416 ms
> (17 rows)
>
> Time: 14599.437 ms (00:14.599)
> ```

I have done some performance testing with TPCH to see the impact on
the different query plan, I could see there are a lot of plan changes
across various queries but out of those, there are few queries where
these patches gave noticeable gain query13 and query17 (I have
attached the plan for these 2 queries).

Test details:
----------------
TPCH scale factor 50 (database size 112GB)
work_mem 20GB, shared buffers: 20GB max_parallel_workers_per_gather=4

Machine information:
Architecture: x86_64
CPU(s): 56
Thread(s) per core: 2
Core(s) per socket: 14
Socket(s): 2
NUMA node(s): 2
Model name: Intel(R) Xeon(R) CPU E5-2695 v3 @ 2.30GHz

Observation:
In the TPCH test, I have noticed that the major gain we are getting in
this patch is because we are able to use the parallelism where we were
not able to use due to the limitation of the parallel aggregate.
Basically, for computing final aggregated results we need to break the
parallelism because the worker is only performing the partial
aggregate and after that, we had to gather all the partially
aggregated results and do the finalize aggregate. Now, with this
patch, since we are batching the results we are able to compute the
final aggregate within the workers itself and that enables us to get
the parallelism in more cases.

Example:
If we observe the output of plan 13(13.explain_head.out), the subquery
is performing the aggregate and the outer query is doing the grouping
on the aggregated value of the subquery, due to this we are not
selecting the parallelism in the head because in the inner aggregation
the number of groups is huge and if we select the parallelism we need
to transfer a lot of tuple through the tuple queue and we will also
have to serialize/deserialize those many transition values. And the
outer query needs the final aggregated results from the inner query so
we can not select the parallelism. Now with the batch
aggregate(13.explain_patch.out), we are able to compute the finalize
aggregation within the workers itself and that enabled us to continue
the parallelism till the top node. The execution time for this query
is now reduced to 57sec from 238sec which is 4X faster.

I will perform some more tests with different scale factors and
analyze the behavior of this.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com

Attachment Content-Type Size
13.explain_head.out application/octet-stream 2.3 KB
17.explain_patch.out application/octet-stream 3.1 KB
17.explain_head.out application/octet-stream 3.3 KB
13.explain_patch.out application/octet-stream 3.8 KB
13.sql application/sql 377 bytes
17.sql application/sql 382 bytes

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Magnus Hagander 2020-11-03 12:46:38 Re: Move OpenSSL random under USE_OPENSSL_RANDOM
Previous Message Heikki Linnakangas 2020-11-03 12:35:32 Re: Parallel copy