A quick sanity testing of PostgreSQL parallel query on Arm64

译者: bzhaoopenstack
作者: Amit Dattatray Khandekar
原文链接: https://amitdkhan-pg.blogspot.com/2021/04/a-quick-sanity-testing-of-parallel.html

PG在很久之前就已经支持并行查询,但是对于Arm64平台它是否有完备性是需要考究的。有很大的可能是还没有人真正完备的测试过,下面我们通过测试结果来看看它在ARM上的表现,同时理解并行查询的真正含义。

A quick sanity testing of PostgreSQL parallel query on Arm64

PostgreSQL已经支持查询并行很长时间了。PG社区中的人们称之为“Parallel query”,但现在它不仅限于SELECT查询。包括索引构建会均分到多个物理核心;甚至像VACUUM这样实用的程序现在也利用了并行技术。此外,社区正在努力并行化复制和插入操作。

我乐于在ARM64平台上对此功能进行“健全性”检查。让我们一起看看进展如何。同时,我们将了解如何解释计划输出的并行化部分。本文不涉及子查询和分区;我会在另一个博客中介绍。
为了运行查询,我使用脚本https://github.com/tvondra/pg_tpch.git 生成了一个scale-5 TPC-H基准模型.测试机器是一个8 CPU虚拟机,15GB内存,Ubuntu 18.04,运行在“鲲鹏920”2.6 GHz主机上。PostgreSQL构建使用的是git master分支,因此您可以在PostgreSQL 13和14之间的某个节点来进行分析。所有测试都是在max_parallel_workers_per_gather = 4的情况下运行的。这些表是预热的,所以我将seq_page_cost和random_page_Cost降低到0.1。

查询计划中省略了EXPLAIN输出中与JIT相关的部分,以保证关注的重点在主查询计划上。此外,为了使计划输出紧凑,我们也省略了执行估计成本。

并行顺序扫描

这是最简单的一个,也是PostgreSQL 9.6中引入查询并行性功能的其中一个。

仅仅一个简单的“select * from lineitem”不会执行并行扫描,因为所有元组都需要从workers传输到master后端。只有当这个元组传输成本足够小时,并行扫描才是有益的。因此,减少选定的行数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

tpch=# explain (analyze, costs off)
tpch-# select l_orderkey from lineitem where l_shipmode = 'AIR' and l_shipinstruct = 'TAKE BACK RETURN';

QUERY PLAN
--------------------------------------------------------------------------------------------------
Gather (actual time=6.264..1776.956 rows=1070891 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Seq Scan on lineitem (actual time=6.959..1640.647 rows=214178 loops=5)
Filter: ((l_shipmode = 'AIR'::bpchar) AND (l_shipinstruct = 'TAKE BACK RETURN'::bpchar))
Rows Removed by Filter: 5785781
Planning Time: 0.205 ms
Execution Time: 1823.987 ms

So parallel sequential scan took 1824 ms to execute. Let's compare this with sequential scan :

tpch=# set max_parallel_workers_per_gather TO 0; -- Disable parallelism

QUERY PLAN
--------------------------------------------------------------------------------------------
Seq Scan on lineitem (actual time=117.795..5077.520 rows=1070891 loops=1)
Filter: ((l_shipmode = 'AIR'::bpchar) AND (l_shipinstruct = 'TAKE BACK RETURN'::bpchar))
Rows Removed by Filter: 28928904
Planning Time: 0.101 ms
Execution Time: 5123.774 ms

并行顺序扫描会有2.5倍速率提升。

在进行其他查询之前,介绍一下我们需要了解的背景…并行性是通过将表的块数据分配给workers来实现的,然后并行的workers将从分给它们的块数据中读取和处理元组。但是,如何确保没有两个workers扫描同一个块呢?毕竟,它们是并行运行的,因此应该确保每个块只由一个特定的worker扫描,否则结果将返回重复的行。为了实现这一目标,workers之间要进行协调。所有的worker都需要意识到它们都是并行运行的,因此workers保留了一个共享的“下一个要读取的块”指针,每个worker在选择自己的下一个块后更新这个指针。将这种类型的并行计划节点称为“并行感知”;它在EXPLAIN输出中的计划名称之前有一个前缀“并行”段。处于这种并行感知的查询计划节点可能正在并行工作,但它自己可能不关心,而实际上,它只并行处理部分行集,因为并行顺序扫描正在处理其它的表块集。为了便于命名,这样的计划可以被称为“平行遗忘”(parallel-oblivious)计划。当我们讨论并行join和聚合操作时再介绍更多相关内容。

另一个经验是:聚集节点是伞形并行节点,在该节点下,子树中的所有节点都由workers来并行运行。聚集节点的工作是收集每个工作节点返回的元组,并将其传递到上层节点。聚集节点上面的所有节点通常都在其父节点后运行。不能有嵌套的聚集节点。

索引扫描

以下查询没有触发并行索引扫描:

1
2
3
4
5
6
7
8
9
tpch=# explain (analyze, costs off)
select l_partkey from lineitem where l_partkey < 100000;
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Index Only Scan using idx_lineitem_part_supp on lineitem (actual time=0.078..895.358 rows=2999506 loops=1)
Index Cond: (l_partkey < 100000)
Heap Fetches: 0
Planning Time: 0.129 ms
Execution Time: 1012.693 ms

因此,尝试降低parallel_tuple_cost,以便再现并行索引扫描:

1
2
3
4
5
6
7
8
9
10
11
12
tpch=# set parallel_tuple_cost TO 0.0001;                                       
tpch=# explain (analyze, costs off) select l_partkey from lineitem where l_partkey < 100000;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Gather (actual time=0.390..387.086 rows=2999506 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Index Only Scan using idx_lineitem_part_supp on lineitem (actual time=0.098..262.780 rows=599901 loops=5)
Index Cond: (l_partkey < 100000)
Heap Fetches: 0
Planning Time: 0.802 ms
Execution Time: 509.306 ms

注意:

parallel_tuple_cost是将元组从workers传输到leader后端的成本。注意,这里人为的设置.0001只是为了再现并行索引扫描。虽然设置它为加速索引扫描的执行时间,但不建议在没有系统的结论性统计数据的情况下更改这些成本计算参数。

Index-only扫描是一种特殊的索引扫描,因为索引已经具有select查询所需的数据,因此避免了单一次的堆扫描;仅仅扫描索引。完全的索引扫描或位图堆扫描也支持并行;我们将在聚合或表join示例中看到更多,是常见的操作。

与非并行索引扫描不同,并行索引扫描不会产生有序结果。多个workers并行读取索引块。因此,尽管每个worker返回自己排序的元组,但由于是并行无序读取索引块,结果集并不会排序。

并行索引扫描支持btree索引的扫描。

并行聚合

与表行数相比,查询中的聚合表达式返回的行数通常要少得多,因为这些值是从行集返回的聚合值。因此,涉及的worker-leader元组传输成本非常低,聚合查询几乎总是由于并行性而受益。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
tpch=# -- Check out sequential aggregate plan
tpch=# set max_parallel_workers_per_gather TO 0;
tpch=# explain (analyze, costs off) select max(l_tax) from lineitem;;
QUERY PLAN
--------------------------------------------------------------------------------
Aggregate (actual time=11230.951..11230.951 rows=1 loops=1)
-> Seq Scan on lineitem (actual time=0.009..2767.802 rows=29999795 loops=1)
Planning Time: 0.105 ms
Execution Time: 11231.739 ms


tpch=# -- Check out parallel aggregate plan
tpch=# set max_parallel_workers_per_gather TO 4;
tpch=# explain (analyze, costs off) select max(l_tax) from lineitem;;
QUERY PLAN
---------------------------------------------------------------------------------------------------
Finalize Aggregate (actual time=2150.383..2190.898 rows=1 loops=1)
-> Gather (actual time=2150.241..2190.883 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (actual time=2137.664..2137.665 rows=1 loops=5)
-> Parallel Seq Scan on lineitem (actual time=0.016..563.268 rows=5999959 loops=5)
Planning Time: 0.896 ms
Execution Time: 2202.304 ms

速率提升5倍.

上面,我们可以看到,通常的Aggregate计划节点分为两种。聚集节点下面的一个是局部Aggregate节点,顾名思义,该节点仅对其自己的worker返回的值进行聚合。这意味着它还没有运行最终函数。最终的Aggregate组合了所有workers通过聚集节点返回的局部Aggregate。

Joins

我们使用相同的查询来分析三种不同的join:

1
2
3
4
5
select avg(l_discount) from orders, lineitem
where
l_orderkey = o_orderkey
and o_orderdate < date '1995-03-09'
and l_shipdate > date '1995-03-09';

Merge Join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
                                                                QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (actual time=5030.051..5241.390 rows=1 loops=1)
-> Gather (actual time=5029.991..5241.370 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (actual time=5015.939..5015.940 rows=1 loops=5)
-> Merge Join (actual time=199.287..4987.159 rows=149782 loops=5)
Merge Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Parallel Index Scan using idx_lineitem_orderkey on lineitem (actual time=198.962..2095.747 rows=3248732 loops=5)
Filter: (l_shipdate > '1995-03-09'::date)
Rows Removed by Filter: 2751227
-> Index Scan using orders_pkey on orders (actual time=0.057..2343.402 rows=3625756 loops=5)
Filter: (o_orderdate < '1995-03-09'::date)
Rows Removed by Filter: 3874054
Planning Time: 0.290 ms
Execution Time: 5243.194 ms

如上文所示,Merge Join位于聚集节点下。这意味着,Merge Join加入正在并行worker中执行。Merge Join是否正在与其他workers协调来执行?或者,换句话说,Merge Join是并行感知的吗?否。如上,Merge Join没有“并行”前缀。Merge Join需要对从外部和内部的输入进行排序,因此我们在内部和外部都进行索引扫描。现在,当在worker中执行Merge Join时,外部表的子集将与完整的内部表join。这是有可能的,因为外部是执行并行索引扫描的,而内部是正常的索引扫描,这意味着每个worker都会对内部进行完整的索引扫描。实际上,Merge Join的数据是被分割的,这是由于被并行索引扫描分割的数据,Merge Join甚至不知道它是正在并行运行!需要注意的是,每个worker必须对内侧进行冗余扫描,然后如果需要再进行排序。在我们的例子中,由于索引的原因,排序操作是不必要的。

通过适当的将两个表分区排序,并行执行分区数据集的Merge Join,使Merge Join并行感知,有一个改进点。但讨论这个问题需要另一篇博客。

并行感知的 Hash Join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Finalize Aggregate (actual time=3054.189..3099.784 rows=1 loops=1)
-> Gather (actual time=3022.810..3099.755 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (actual time=3007.931..3007.934 rows=1 loops=5)
-> Parallel Hash Join (actual time=643.552..2980.305 rows=149782 loops=5)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Parallel Seq Scan on lineitem (actual time=0.030..1685.258 rows=3248732 loops=5)
Filter: (l_shipdate > '1995-03-09'::date)
Rows Removed by Filter: 2751227
-> Parallel Hash (actual time=639.508..639.508 rows=725169 loops=5)
Buckets: 4194304 Batches: 1 Memory Usage: 174688kB
-> Parallel Seq Scan on orders (actual time=14.083..384.196 rows=725169 loops=5)
Filter: (o_orderdate < '1995-03-09'::date)
Rows Removed by Filter: 774831
Planning Time: 0.300 ms
Execution Time: 3101.937 ms

Hash Join内包含一个“并行哈希”节点。计划通过协调并行workers划分构建单一worker并共享哈希表。与顺序Hash Join一样,外层会等待哈希表的构建。当其构建后,相同的workers开始扫描外层表,并使用共享的哈希表执行join。外部扫描本质上是部分扫描,因为每个worker并行执行。所以在我们的例子中,这是一个并行顺序扫描。

并行遗忘的 Hash Join

如果内部只是一个哈希节点,而不是一个“并行哈希”节点,那么这意味着:每个worker节点将构建一个单独完整的哈希表,而不是一个共享哈希表,由于没有划分哈希构建工作,这显然比并行哈希更昂贵:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (actual time=5908.032..5971.214 rows=1 loops=1)
-> Gather (actual time=5852.417..5971.167 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (actual time=5850.930..5850.933 rows=1 loops=5)
-> Hash Join (actual time=2309.307..5826.753 rows=149782 loops=5)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Parallel Seq Scan on lineitem (actual time=12.631..1712.443 rows=3248732 loops=5)
Filter: (l_shipdate > '1995-03-09'::date)
Rows Removed by Filter: 2751227
-> Hash (actual time=2290.063..2290.065 rows=3625845 loops=5)
Buckets: 2097152 Batches: 4 Memory Usage: 48222kB
-> Bitmap Heap Scan on orders (actual time=502.264..1512.424 rows=3625845 loops=5)
Recheck Cond: (o_orderdate < '1995-03-09'::date)
Heap Blocks: exact=138113
-> Bitmap Index Scan on idx_orders_orderdate (actual time=451.552..451.552 rows=3625845 loops=5)
Index Cond: (o_orderdate < '1995-03-09'::date)
Planning Time: 0.291 ms
Execution Time: 5977.966 ms

嵌套循环 Join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
                                                      QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (actual time=7211.122..7258.289 rows=1 loops=1)
-> Gather (actual time=7193.150..7258.259 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (actual time=7129.209..7129.210 rows=1 loops=5)
-> Nested Loop (actual time=13.924..7100.095 rows=149782 loops=5)
-> Parallel Seq Scan on lineitem (actual time=13.621..1919.712 rows=3248732 loops=5)
Filter: (l_shipdate > '1995-03-09'::date)
Rows Removed by Filter: 2751227
-> Result Cache (actual time=0.001..0.001 rows=0 loops=16243662)
Cache Key: lineitem.l_orderkey
Hits: 2450631 Misses: 844081 Evictions: 0 Overflows: 0 Memory Usage: 61379kB
Worker 0: Hits: 2443189 Misses: 841050 Evictions: 0 Overflows: 0 Memory Usage: 61158kB
Worker 1: Hits: 2350093 Misses: 808929 Evictions: 0 Overflows: 0 Memory Usage: 58824kB
Worker 2: Hits: 2424018 Misses: 833681 Evictions: 0 Overflows: 0 Memory Usage: 60615kB
Worker 3: Hits: 2417114 Misses: 830876 Evictions: 0 Overflows: 0 Memory Usage: 60407kB
-> Index Scan using orders_pkey on orders (actual time=0.004..0.004 rows=0 loops=4158617)
Index Cond: (o_orderkey = lineitem.l_orderkey)
Filter: (o_orderdate < '1995-03-09'::date)
Rows Removed by Filter: 1
Planning Time: 0.294 ms
Execution Time: 7268.857 ms

从本质上讲,嵌套循环join必须为每个外部元组扫描整个内层。因此,我们可以在workers之间划分外部扫描,并由每个worker进行完整的内部表扫描,这将为我们提供一个并行遗忘的嵌套循环join。没有必要使它具有并行意识。

顺序 Join

If we disable parallelism, we can see a sequential hash join. Note that all of the above parallel joins are reasonably fater than the below sequential join …
如果我们禁用并行功能,将会是顺序哈希join。注意,上面所有的并行Join都比下面的顺序join树状结构要胖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
tpch=# set max_parallel_workers_per_gather TO  0;

QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Aggregate (actual time=15714.776..15714.779 rows=1 loops=1)
-> Hash Join (actual time=5134.219..15603.861 rows=748912 loops=1)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Bitmap Heap Scan on lineitem (actual time=2837.938..7162.214 rows=16243662 loops=1)
Recheck Cond: (l_shipdate > '1995-03-09'::date)
Heap Blocks: exact=607593
-> Bitmap Index Scan on idx_lineitem_shipdate (actual time=2556.845..2556.845 rows=16243662 loops=1)
Index Cond: (l_shipdate > '1995-03-09'::date)
-> Hash (actual time=2290.201..2290.202 rows=3625845 loops=1)
Buckets: 2097152 Batches: 4 Memory Usage: 48222kB
-> Bitmap Heap Scan on orders (actual time=563.536..1548.176 rows=3625845 loops=1)
Recheck Cond: (o_orderdate < '1995-03-09'::date)
Heap Blocks: exact=138113
-> Bitmap Index Scan on idx_orders_orderdate (actual time=333.284..333.285 rows=3625845 loops=1)
Index Cond: (o_orderdate < '1995-03-09'::date)
Planning Time: 0.267 ms
Execution Time: 15727.275 ms

Gather Merge

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
tpch=# explain (analyze, costs off)
select l_orderkey from lineitem where l_suppkey > 10000 order by l_suppkey ;
QUERY PLAN
----------------------------------------------------------------------------------------------
Gather Merge (actual time=3351.705..8310.367 rows=23998124 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Sort (actual time=3181.446..3896.115 rows=4799625 loops=5)
Sort Key: l_suppkey
Sort Method: external merge Disk: 136216kB
Worker 0: Sort Method: external merge Disk: 120208kB
Worker 1: Sort Method: external merge Disk: 116392kB
Worker 2: Sort Method: external merge Disk: 123520kB
Worker 3: Sort Method: external merge Disk: 114264kB
-> Parallel Seq Scan on lineitem (actual time=55.688..915.160 rows=4799625 loops=5)
Filter: (l_suppkey > 10000)
Rows Removed by Filter: 1200334
Planning Time: 0.102 ms
Execution Time: 9654.078 ms

Gather Merge是聚集计划的改进版。它基本上将排序并行了。因此,聚集从worker中获取排序输出,然后合并它们并返回排序后的输出。

顺序排序几乎花了两倍的时间:

1
2
3
4
5
6
7
8
9
10
                                   QUERY PLAN                                    
---------------------------------------------------------------------------------
Sort (actual time=14399.200..18068.514 rows=23998124 loops=1)
Sort Key: l_suppkey
Sort Method: external merge Disk: 610560kB
-> Seq Scan on lineitem (actual time=16.346..4320.823 rows=23998124 loops=1)
Filter: (l_suppkey > 10000)
Rows Removed by Filter: 6001671
Planning Time: 0.086 ms
Execution Time: 20015.980 ms

还有很多其他的场景可以观察到并行性,但我可能会在后面的博客中讨论它。

Query parallelism is supported in PostgreSQL since quite a while now. People in the PG community call it “Parallel query”, but by now it is not limited to just SELECT queries. Index build leverages multiple cores; and even utilities like VACUUM now make use of parallelism. Furthermore, community is working on parallelizing COPY and INSERTs.

I was interested to do kind-of “sanity” check of this capability specifically on ARM64 platform. Let’s see how it goes. And also at the same time, we will try to understand little bit of how to interpret the parallelism part of the plan output. Subqueries and partitions are not covered in this blog; probably I will add it in another blog.

For running the queries I generated a scale-5 TPC-H benchmark schema with the help of scripts taken from https://github.com/tvondra/pg_tpch.git. My machine is an 8 CPU VM with 15GB memory and Ubuntu 18.04, running on a “Kunpeng 920” 2.6 GHz host. The PostgreSQL build was using git master branch, so you can treat it somewhere between PostgreSQL 13 and 14. All the tests were run with max_parallel_workers_per_gather = 4. The tables were pre-warmed, so I reduced seq_page_cost and random_page_cost to as low as 0.1.

The JIT-related part of the EXPLAIN output is omitted from the plans to keep the focus on the main query plan. Also, estimated costs are omitted in order to make the plan output compact.

Parallel sequential scan

This is the simplest one, and the one with which query parallelism got introduced in PostgreSQL 9.6.

Just a plain “select * from lineitem” won’t give us a parallel scan, because all the tuples need to be transferred from workers to the leader backend. Parallel scan is beneficial only when this tuple transfer cost is small enough. So let’s reduce the number of rows selected :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
tpch=# explain (analyze, costs off)
tpch-# select l_orderkey from lineitem where l_shipmode = 'AIR' and l_shipinstruct = 'TAKE BACK RETURN';

QUERY PLAN
--------------------------------------------------------------------------------------------------
Gather (actual time=6.264..1776.956 rows=1070891 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Seq Scan on lineitem (actual time=6.959..1640.647 rows=214178 loops=5)
Filter: ((l_shipmode = 'AIR'::bpchar) AND (l_shipinstruct = 'TAKE BACK RETURN'::bpchar))
Rows Removed by Filter: 5785781
Planning Time: 0.205 ms
Execution Time: 1823.987 ms

So parallel sequential scan took 1824 ms to execute. Let's compare this with sequential scan :

tpch=# set max_parallel_workers_per_gather TO 0; -- Disable parallelism

QUERY PLAN
--------------------------------------------------------------------------------------------
Seq Scan on lineitem (actual time=117.795..5077.520 rows=1070891 loops=1)
Filter: ((l_shipmode = 'AIR'::bpchar) AND (l_shipinstruct = 'TAKE BACK RETURN'::bpchar))
Rows Removed by Filter: 28928904
Planning Time: 0.101 ms
Execution Time: 5123.774 ms

So parallel seqscan was around 2.5 times faster.

Just a background before we go for other queries … Parallelism is achieved by distributing the table blocks to the workers, and the parallel workers would then do their job of reading and processing tuples from the blocks they read. But how is it made sure that no two workers scan the same block ? After all, they are running in parallel, so they should make sure that each block should be scanned only by one particular worker, otherwise duplicate rows would be returned. To make this happen, there is a coordination between the workers. They all are aware that they are all running in parallel, so they keep a shared “next block to read” pointer, which each worker updates once it chooses it’s own next block. This type of parallel plan node is called “parallel-aware”; it has a prefix “Parallel” before the plan name in the EXPLAIN output. A plan node sitting on top of such parallel-aware node might itself be running in a parallel worker, but it may not be aware of it, while actually it is processing only a partial set of rows in parallel since the underlying parallel-seq scan is processing its own set of table blocks. Such plan can be called as “parallel-oblivious” plan, for the sake of naming it. We will talk about this more when we discuss parallel joins and aggregates.

Another thumb-rule is : A Gather node is the umbrella parallel node, under which all the nodes in the subtree are run in parallel by workers. A Gather node’s job is to gather the tuples returned by each worker, and pass it on to the upper node. All the nodes above Gather run in the usual parent backend. There cannot be nested Gather nodes.

Index Scan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
The following query didn't produce a parallel index scan :                      

tpch=# explain (analyze, costs off)
select l_partkey from lineitem where l_partkey < 100000;
QUERY PLAN
------------------------------------------------------------------------------------------------------------
Index Only Scan using idx_lineitem_part_supp on lineitem (actual time=0.078..895.358 rows=2999506 loops=1)
Index Cond: (l_partkey < 100000)
Heap Fetches: 0
Planning Time: 0.129 ms
Execution Time: 1012.693 ms

So let's try reducing parallel_tuple_cost, for the sake of reproducing a parallel index scan :
tpch=# set parallel_tuple_cost TO 0.0001;
tpch=# explain (analyze, costs off) select l_partkey from lineitem where l_partkey < 100000;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Gather (actual time=0.390..387.086 rows=2999506 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Parallel Index Only Scan using idx_lineitem_part_supp on lineitem (actual time=0.098..262.780 rows=599901 loops=5)
Index Cond: (l_partkey < 100000)
Heap Fetches: 0
Planning Time: 0.802 ms
Execution Time: 509.306 ms

Notes:

parallel_tuple_cost is the cost of transferring tuples from workers to leader backend. Note that I used a contrived value of .0001 just for the sake of reproducing a parallel index scan. Although setting it is giving us an index scan with faster execution time, it is not recommended to change these costing parameters without obtaining conclusive statistics on your system.

Index-only scan is a special kind of Index Scan, in that the index already has the data required by the select query, so a separate heap scan is avoided; only index is scanned. A plain index scan or a bitmap heap scan also supports parallelism; we will see more of these in aggregate or table join examples where they are more commonly seen.

A parallel index scan does not produce ordered results, unlike a non-parallel index scan. Multiple workers read index blocks in parallel. So although each worker returns its own tuples sorted, together the result set is not sorted due to parallel index block reads.

Parellel index scan is only supported for a btree index.

Parallel Aggregate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
An aggregate expression in a query typically has drastically less number of rows returned, compared to the number of table rows, inheritently since the values are aggregate values returned from over a row set. So there is very less worker-leader tuple transfer cost involved, so aggregate query almost always gets benefited due to parallelism.

tpch=# -- Check out sequential aggregate plan
tpch=# set max_parallel_workers_per_gather TO 0;
tpch=# explain (analyze, costs off) select max(l_tax) from lineitem;;
QUERY PLAN
--------------------------------------------------------------------------------
Aggregate (actual time=11230.951..11230.951 rows=1 loops=1)
-> Seq Scan on lineitem (actual time=0.009..2767.802 rows=29999795 loops=1)
Planning Time: 0.105 ms
Execution Time: 11231.739 ms


tpch=# -- Check out parallel aggregate plan
tpch=# set max_parallel_workers_per_gather TO 4;
tpch=# explain (analyze, costs off) select max(l_tax) from lineitem;;
QUERY PLAN
---------------------------------------------------------------------------------------------------
Finalize Aggregate (actual time=2150.383..2190.898 rows=1 loops=1)
-> Gather (actual time=2150.241..2190.883 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (actual time=2137.664..2137.665 rows=1 loops=5)
-> Parallel Seq Scan on lineitem (actual time=0.016..563.268 rows=5999959 loops=5)
Planning Time: 0.896 ms
Execution Time: 2202.304 ms

So it’s 5 times faster; pretty neat.

Above, we can see that the usual Aggregate plan node is divided into two kinds of Aggregate plan nodes. The one below Gather node is the Partial Aggregate node, which, as it name implies, does an aggregate of only the values returned by its own worker. It means that it has not run the finalize function yet. That is the task of the Finalize Aggregate, which combines the partial aggregates returned by all the workers through the Gather node.

Joins

We will analyze the three different joins using this query :

1
2
3
4
5
select avg(l_discount) from orders, lineitem
where
l_orderkey = o_orderkey
and o_orderdate < date '1995-03-09'
and l_shipdate > date '1995-03-09';

Merge Join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
                                                                QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (actual time=5030.051..5241.390 rows=1 loops=1)
-> Gather (actual time=5029.991..5241.370 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (actual time=5015.939..5015.940 rows=1 loops=5)
-> Merge Join (actual time=199.287..4987.159 rows=149782 loops=5)
Merge Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Parallel Index Scan using idx_lineitem_orderkey on lineitem (actual time=198.962..2095.747 rows=3248732 loops=5)
Filter: (l_shipdate > '1995-03-09'::date)
Rows Removed by Filter: 2751227
-> Index Scan using orders_pkey on orders (actual time=0.057..2343.402 rows=3625756 loops=5)
Filter: (o_orderdate < '1995-03-09'::date)
Rows Removed by Filter: 3874054
Planning Time: 0.290 ms
Execution Time: 5243.194 ms

As you can see above, Merge Join is under a Gather node. That means, Merge Join is being executed in a parallel worker. Is the merge join being executed using some coordination with other workers ? Or, in other words, is Merge Join parallel-aware ? No. As you can see, the Merge Join does not have a “Parallel” prefix. Merge Join needs sorted input from both outer side and inner side, hence we have index scans both at inner side and outer side. Now, when a Merge Join is executed in a worker, a subset of outer side table is joined with full inner side. This is possible because the outer side is scanned using parallel Index Scan, and the inner side is a normal Index scan which means each worker does a full Index Scan of inner side. Effectively, the Merge join data is divided, thanks to the data that got divided by underlying Parallel Index Scan, and the Merge join does not even know that it is being run in parallel ! The caveat is that the inner side has to be redundantly scanned fully by each worker, followed by a sort if required. In our case the sort operation was not necessary becaues of the index.

There is a scope for improvement to make the Merge Join parallel-aware, by appropriately partitioning sorted data of both tables and do Merge Join of the pairs of partitioned data sets in parallel. But that discussion would need a separate blog.

Parallel-aware Hash Join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
                                                 QUERY PLAN                                                  
-------------------------------------------------------------------------------------------------------------
Finalize Aggregate (actual time=3054.189..3099.784 rows=1 loops=1)
-> Gather (actual time=3022.810..3099.755 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (actual time=3007.931..3007.934 rows=1 loops=5)
-> Parallel Hash Join (actual time=643.552..2980.305 rows=149782 loops=5)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Parallel Seq Scan on lineitem (actual time=0.030..1685.258 rows=3248732 loops=5)
Filter: (l_shipdate > '1995-03-09'::date)
Rows Removed by Filter: 2751227
-> Parallel Hash (actual time=639.508..639.508 rows=725169 loops=5)
Buckets: 4194304 Batches: 1 Memory Usage: 174688kB
-> Parallel Seq Scan on orders (actual time=14.083..384.196 rows=725169 loops=5)
Filter: (o_orderdate < '1995-03-09'::date)
Rows Removed by Filter: 774831
Planning Time: 0.300 ms
Execution Time: 3101.937 ms

The inner side of Hash Join is a “Parallel Hash” node. This plan builds a shared hash table by dividing the work among parallel coordinating workers. As with a sequential Hash Join, the outer side waits until the hash table is built. Once it is built, the same workers now start scanning the outer table and doing the join using the shared hash table. The outer scan is essentially a partial scan because each worker does it in parallel. So in our case, it’s a parallel sequential scan.

Parallel-oblivious Hash Join

If the inner side is just a Hash node rather than a “Parallel Hash” node, then it means: a separate full hash table will be built by each of the workers rather than having a shared hash table, which obviously would be expensive than the parallel hash due to the absence of division of hash building work:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (actual time=5908.032..5971.214 rows=1 loops=1)
-> Gather (actual time=5852.417..5971.167 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (actual time=5850.930..5850.933 rows=1 loops=5)
-> Hash Join (actual time=2309.307..5826.753 rows=149782 loops=5)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Parallel Seq Scan on lineitem (actual time=12.631..1712.443 rows=3248732 loops=5)
Filter: (l_shipdate > '1995-03-09'::date)
Rows Removed by Filter: 2751227
-> Hash (actual time=2290.063..2290.065 rows=3625845 loops=5)
Buckets: 2097152 Batches: 4 Memory Usage: 48222kB
-> Bitmap Heap Scan on orders (actual time=502.264..1512.424 rows=3625845 loops=5)
Recheck Cond: (o_orderdate < '1995-03-09'::date)
Heap Blocks: exact=138113
-> Bitmap Index Scan on idx_orders_orderdate (actual time=451.552..451.552 rows=3625845 loops=5)
Index Cond: (o_orderdate < '1995-03-09'::date)
Planning Time: 0.291 ms
Execution Time: 5977.966 ms

Nested Loop Join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
                                                      QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (actual time=7211.122..7258.289 rows=1 loops=1)
-> Gather (actual time=7193.150..7258.259 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (actual time=7129.209..7129.210 rows=1 loops=5)
-> Nested Loop (actual time=13.924..7100.095 rows=149782 loops=5)
-> Parallel Seq Scan on lineitem (actual time=13.621..1919.712 rows=3248732 loops=5)
Filter: (l_shipdate > '1995-03-09'::date)
Rows Removed by Filter: 2751227
-> Result Cache (actual time=0.001..0.001 rows=0 loops=16243662)
Cache Key: lineitem.l_orderkey
Hits: 2450631 Misses: 844081 Evictions: 0 Overflows: 0 Memory Usage: 61379kB
Worker 0: Hits: 2443189 Misses: 841050 Evictions: 0 Overflows: 0 Memory Usage: 61158kB
Worker 1: Hits: 2350093 Misses: 808929 Evictions: 0 Overflows: 0 Memory Usage: 58824kB
Worker 2: Hits: 2424018 Misses: 833681 Evictions: 0 Overflows: 0 Memory Usage: 60615kB
Worker 3: Hits: 2417114 Misses: 830876 Evictions: 0 Overflows: 0 Memory Usage: 60407kB
-> Index Scan using orders_pkey on orders (actual time=0.004..0.004 rows=0 loops=4158617)
Index Cond: (o_orderkey = lineitem.l_orderkey)
Filter: (o_orderdate < '1995-03-09'::date)
Rows Removed by Filter: 1
Planning Time: 0.294 ms
Execution Time: 7268.857 ms

By nature, nested loop join has to have the whole inner side scanned for each of the outer tuple. So we can divide the outer scan among workers, and have a complete inner table scan by each of the workers, which will give us a parallel-oblivious Nested Loop Join. There is no need to make it parallel-aware.

Sequential Join

If we disable parallelism, we can see a sequential hash join. Note that all of the above parallel joins are reasonably fater than the below sequential join …

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
tpch=# set max_parallel_workers_per_gather TO  0;

QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
Aggregate (actual time=15714.776..15714.779 rows=1 loops=1)
-> Hash Join (actual time=5134.219..15603.861 rows=748912 loops=1)
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Bitmap Heap Scan on lineitem (actual time=2837.938..7162.214 rows=16243662 loops=1)
Recheck Cond: (l_shipdate > '1995-03-09'::date)
Heap Blocks: exact=607593
-> Bitmap Index Scan on idx_lineitem_shipdate (actual time=2556.845..2556.845 rows=16243662 loops=1)
Index Cond: (l_shipdate > '1995-03-09'::date)
-> Hash (actual time=2290.201..2290.202 rows=3625845 loops=1)
Buckets: 2097152 Batches: 4 Memory Usage: 48222kB
-> Bitmap Heap Scan on orders (actual time=563.536..1548.176 rows=3625845 loops=1)
Recheck Cond: (o_orderdate < '1995-03-09'::date)
Heap Blocks: exact=138113
-> Bitmap Index Scan on idx_orders_orderdate (actual time=333.284..333.285 rows=3625845 loops=1)
Index Cond: (o_orderdate < '1995-03-09'::date)
Planning Time: 0.267 ms
Execution Time: 15727.275 ms

Gather Merge

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
tpch=# explain (analyze, costs off)
select l_orderkey from lineitem where l_suppkey > 10000 order by l_suppkey ;
QUERY PLAN
----------------------------------------------------------------------------------------------
Gather Merge (actual time=3351.705..8310.367 rows=23998124 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Sort (actual time=3181.446..3896.115 rows=4799625 loops=5)
Sort Key: l_suppkey
Sort Method: external merge Disk: 136216kB
Worker 0: Sort Method: external merge Disk: 120208kB
Worker 1: Sort Method: external merge Disk: 116392kB
Worker 2: Sort Method: external merge Disk: 123520kB
Worker 3: Sort Method: external merge Disk: 114264kB
-> Parallel Seq Scan on lineitem (actual time=55.688..915.160 rows=4799625 loops=5)
Filter: (l_suppkey > 10000)
Rows Removed by Filter: 1200334
Planning Time: 0.102 ms
Execution Time: 9654.078 ms

Gather Merge is a modified version of the Gather plan. It basically parallelizes the sort. So Gather gets sorted output from the workers, and then it merges them and returns sorted output.

A sequential Sort took almost almost twice longer :

1
2
3
4
5
6
7
8
9
10
                                   QUERY PLAN                                    
---------------------------------------------------------------------------------
Sort (actual time=14399.200..18068.514 rows=23998124 loops=1)
Sort Key: l_suppkey
Sort Method: external merge Disk: 610560kB
-> Seq Scan on lineitem (actual time=16.346..4320.823 rows=23998124 loops=1)
Filter: (l_suppkey > 10000)
Rows Removed by Filter: 6001671
Planning Time: 0.086 ms
Execution Time: 20015.980 ms

There are lot of other scenarios where parallelism can be observed, but probably I will take it up in a later blog ….

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×