tpch=# explain (analyze, costs off) tpch-# select l_orderkey from lineitem where l_shipmode = 'AIR' and l_shipinstruct = 'TAKE BACK RETURN';
QUERY PLAN -------------------------------------------------------------------------------------------------- Gather (actual time=6.264..1776.956 rows=1070891 loops=1) Workers Planned: 4 Workers Launched: 4 -> Parallel Seq Scan on lineitem (actual time=6.959..1640.647 rows=214178 loops=5) Filter: ((l_shipmode = 'AIR'::bpchar) AND (l_shipinstruct = 'TAKE BACK RETURN'::bpchar)) Rows Removed by Filter: 5785781 Planning Time: 0.205 ms Execution Time: 1823.987 ms
So parallel sequential scan took 1824 ms to execute. Let's compare this with sequential scan :
tpch=# set max_parallel_workers_per_gather TO 0; -- Disable parallelism
QUERY PLAN -------------------------------------------------------------------------------------------- Seq Scan on lineitem (actual time=117.795..5077.520 rows=1070891 loops=1) Filter: ((l_shipmode = 'AIR'::bpchar) AND (l_shipinstruct = 'TAKE BACK RETURN'::bpchar)) Rows Removed by Filter: 28928904 Planning Time: 0.101 ms Execution Time: 5123.774 ms
tpch=# explain (analyze, costs off) select l_partkey from lineitem where l_partkey < 100000; QUERY PLAN ------------------------------------------------------------------------------------------------------------ Index Only Scan using idx_lineitem_part_supp on lineitem (actual time=0.078..895.358 rows=2999506 loops=1) Index Cond: (l_partkey < 100000) Heap Fetches: 0 Planning Time: 0.129 ms Execution Time: 1012.693 ms
因此,尝试降低parallel_tuple_cost,以便再现并行索引扫描:
1 2 3 4 5 6 7 8 9 10 11 12
tpch=# set parallel_tuple_cost TO 0.0001; tpch=# explain (analyze, costs off) select l_partkey from lineitem where l_partkey < 100000; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------- Gather (actual time=0.390..387.086 rows=2999506 loops=1) Workers Planned: 4 Workers Launched: 4 -> Parallel Index Only Scan using idx_lineitem_part_supp on lineitem (actual time=0.098..262.780 rows=599901 loops=5) Index Cond: (l_partkey < 100000) Heap Fetches: 0 Planning Time: 0.802 ms Execution Time: 509.306 ms
If we disable parallelism, we can see a sequential hash join. Note that all of the above parallel joins are reasonably fater than the below sequential join … 如果我们禁用并行功能,将会是顺序哈希join。注意,上面所有的并行Join都比下面的顺序join树状结构要胖。
QUERY PLAN --------------------------------------------------------------------------------- Sort (actual time=14399.200..18068.514 rows=23998124 loops=1) Sort Key: l_suppkey Sort Method: external merge Disk: 610560kB -> Seq Scan on lineitem (actual time=16.346..4320.823 rows=23998124 loops=1) Filter: (l_suppkey > 10000) Rows Removed by Filter: 6001671 Planning Time: 0.086 ms Execution Time: 20015.980 ms
还有很多其他的场景可以观察到并行性,但我可能会在后面的博客中讨论它。
Query parallelism is supported in PostgreSQL since quite a while now. People in the PG community call it “Parallel query”, but by now it is not limited to just SELECT queries. Index build leverages multiple cores; and even utilities like VACUUM now make use of parallelism. Furthermore, community is working on parallelizing COPY and INSERTs.
I was interested to do kind-of “sanity” check of this capability specifically on ARM64 platform. Let’s see how it goes. And also at the same time, we will try to understand little bit of how to interpret the parallelism part of the plan output. Subqueries and partitions are not covered in this blog; probably I will add it in another blog.
For running the queries I generated a scale-5 TPC-H benchmark schema with the help of scripts taken from https://github.com/tvondra/pg_tpch.git. My machine is an 8 CPU VM with 15GB memory and Ubuntu 18.04, running on a “Kunpeng 920” 2.6 GHz host. The PostgreSQL build was using git master branch, so you can treat it somewhere between PostgreSQL 13 and 14. All the tests were run with max_parallel_workers_per_gather = 4. The tables were pre-warmed, so I reduced seq_page_cost and random_page_cost to as low as 0.1.
The JIT-related part of the EXPLAIN output is omitted from the plans to keep the focus on the main query plan. Also, estimated costs are omitted in order to make the plan output compact.
Parallel sequential scan
This is the simplest one, and the one with which query parallelism got introduced in PostgreSQL 9.6.
Just a plain “select * from lineitem” won’t give us a parallel scan, because all the tuples need to be transferred from workers to the leader backend. Parallel scan is beneficial only when this tuple transfer cost is small enough. So let’s reduce the number of rows selected :
tpch=# explain (analyze, costs off) tpch-# select l_orderkey from lineitem where l_shipmode = 'AIR' and l_shipinstruct = 'TAKE BACK RETURN';
QUERY PLAN -------------------------------------------------------------------------------------------------- Gather (actual time=6.264..1776.956 rows=1070891 loops=1) Workers Planned: 4 Workers Launched: 4 -> Parallel Seq Scan on lineitem (actual time=6.959..1640.647 rows=214178 loops=5) Filter: ((l_shipmode = 'AIR'::bpchar) AND (l_shipinstruct = 'TAKE BACK RETURN'::bpchar)) Rows Removed by Filter: 5785781 Planning Time: 0.205 ms Execution Time: 1823.987 ms
So parallel sequential scan took 1824 ms to execute. Let's compare this with sequential scan :
tpch=# set max_parallel_workers_per_gather TO 0; -- Disable parallelism
QUERY PLAN -------------------------------------------------------------------------------------------- Seq Scan on lineitem (actual time=117.795..5077.520 rows=1070891 loops=1) Filter: ((l_shipmode = 'AIR'::bpchar) AND (l_shipinstruct = 'TAKE BACK RETURN'::bpchar)) Rows Removed by Filter: 28928904 Planning Time: 0.101 ms Execution Time: 5123.774 ms
So parallel seqscan was around 2.5 times faster.
Just a background before we go for other queries … Parallelism is achieved by distributing the table blocks to the workers, and the parallel workers would then do their job of reading and processing tuples from the blocks they read. But how is it made sure that no two workers scan the same block ? After all, they are running in parallel, so they should make sure that each block should be scanned only by one particular worker, otherwise duplicate rows would be returned. To make this happen, there is a coordination between the workers. They all are aware that they are all running in parallel, so they keep a shared “next block to read” pointer, which each worker updates once it chooses it’s own next block. This type of parallel plan node is called “parallel-aware”; it has a prefix “Parallel” before the plan name in the EXPLAIN output. A plan node sitting on top of such parallel-aware node might itself be running in a parallel worker, but it may not be aware of it, while actually it is processing only a partial set of rows in parallel since the underlying parallel-seq scan is processing its own set of table blocks. Such plan can be called as “parallel-oblivious” plan, for the sake of naming it. We will talk about this more when we discuss parallel joins and aggregates.
Another thumb-rule is : A Gather node is the umbrella parallel node, under which all the nodes in the subtree are run in parallel by workers. A Gather node’s job is to gather the tuples returned by each worker, and pass it on to the upper node. All the nodes above Gather run in the usual parent backend. There cannot be nested Gather nodes.
The following query didn't produce a parallel index scan :
tpch=# explain (analyze, costs off) select l_partkey from lineitem where l_partkey < 100000; QUERY PLAN ------------------------------------------------------------------------------------------------------------ Index Only Scan using idx_lineitem_part_supp on lineitem (actual time=0.078..895.358 rows=2999506 loops=1) Index Cond: (l_partkey < 100000) Heap Fetches: 0 Planning Time: 0.129 ms Execution Time: 1012.693 ms
So let's try reducing parallel_tuple_cost, for the sake of reproducing a parallel index scan : tpch=# set parallel_tuple_cost TO 0.0001; tpch=# explain (analyze, costs off) select l_partkey from lineitem where l_partkey < 100000; QUERY PLAN -------------------------------------------------------------------------------------------------------------------------- Gather (actual time=0.390..387.086 rows=2999506 loops=1) Workers Planned: 4 Workers Launched: 4 -> Parallel Index Only Scan using idx_lineitem_part_supp on lineitem (actual time=0.098..262.780 rows=599901 loops=5) Index Cond: (l_partkey < 100000) Heap Fetches: 0 Planning Time: 0.802 ms Execution Time: 509.306 ms
Notes:
parallel_tuple_cost is the cost of transferring tuples from workers to leader backend. Note that I used a contrived value of .0001 just for the sake of reproducing a parallel index scan. Although setting it is giving us an index scan with faster execution time, it is not recommended to change these costing parameters without obtaining conclusive statistics on your system.
Index-only scan is a special kind of Index Scan, in that the index already has the data required by the select query, so a separate heap scan is avoided; only index is scanned. A plain index scan or a bitmap heap scan also supports parallelism; we will see more of these in aggregate or table join examples where they are more commonly seen.
A parallel index scan does not produce ordered results, unlike a non-parallel index scan. Multiple workers read index blocks in parallel. So although each worker returns its own tuples sorted, together the result set is not sorted due to parallel index block reads.
Parellel index scan is only supported for a btree index.
An aggregate expression in a query typically has drastically less number of rows returned, compared to the number of table rows, inheritently since the values are aggregate values returned from over a row set. So there is very less worker-leader tuple transfer cost involved, so aggregate query almost always gets benefited due to parallelism.
tpch=# -- Check out sequential aggregate plan tpch=# set max_parallel_workers_per_gather TO 0; tpch=# explain (analyze, costs off) select max(l_tax) from lineitem;; QUERY PLAN -------------------------------------------------------------------------------- Aggregate (actual time=11230.951..11230.951 rows=1 loops=1) -> Seq Scan on lineitem (actual time=0.009..2767.802 rows=29999795 loops=1) Planning Time: 0.105 ms Execution Time: 11231.739 ms
tpch=# -- Check out parallel aggregate plan tpch=# set max_parallel_workers_per_gather TO 4; tpch=# explain (analyze, costs off) select max(l_tax) from lineitem;; QUERY PLAN --------------------------------------------------------------------------------------------------- Finalize Aggregate (actual time=2150.383..2190.898 rows=1 loops=1) -> Gather (actual time=2150.241..2190.883 rows=5 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial Aggregate (actual time=2137.664..2137.665 rows=1 loops=5) -> Parallel Seq Scan on lineitem (actual time=0.016..563.268 rows=5999959 loops=5) Planning Time: 0.896 ms Execution Time: 2202.304 ms
So it’s 5 times faster; pretty neat.
Above, we can see that the usual Aggregate plan node is divided into two kinds of Aggregate plan nodes. The one below Gather node is the Partial Aggregate node, which, as it name implies, does an aggregate of only the values returned by its own worker. It means that it has not run the finalize function yet. That is the task of the Finalize Aggregate, which combines the partial aggregates returned by all the workers through the Gather node.
Joins
We will analyze the three different joins using this query :
1 2 3 4 5
select avg(l_discount) from orders, lineitem where l_orderkey = o_orderkey and o_orderdate < date '1995-03-09' and l_shipdate > date '1995-03-09';
Merge Join
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
QUERY PLAN ------------------------------------------------------------------------------------------------------------------------------------------ Finalize Aggregate (actual time=5030.051..5241.390 rows=1 loops=1) -> Gather (actual time=5029.991..5241.370 rows=5 loops=1) Workers Planned: 4 Workers Launched: 4 -> Partial Aggregate (actual time=5015.939..5015.940 rows=1 loops=5) -> Merge Join (actual time=199.287..4987.159 rows=149782 loops=5) Merge Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Index Scan using idx_lineitem_orderkey on lineitem (actual time=198.962..2095.747 rows=3248732 loops=5) Filter: (l_shipdate > '1995-03-09'::date) Rows Removed by Filter: 2751227 -> Index Scan using orders_pkey on orders (actual time=0.057..2343.402 rows=3625756 loops=5) Filter: (o_orderdate < '1995-03-09'::date) Rows Removed by Filter: 3874054 Planning Time: 0.290 ms Execution Time: 5243.194 ms
As you can see above, Merge Join is under a Gather node. That means, Merge Join is being executed in a parallel worker. Is the merge join being executed using some coordination with other workers ? Or, in other words, is Merge Join parallel-aware ? No. As you can see, the Merge Join does not have a “Parallel” prefix. Merge Join needs sorted input from both outer side and inner side, hence we have index scans both at inner side and outer side. Now, when a Merge Join is executed in a worker, a subset of outer side table is joined with full inner side. This is possible because the outer side is scanned using parallel Index Scan, and the inner side is a normal Index scan which means each worker does a full Index Scan of inner side. Effectively, the Merge join data is divided, thanks to the data that got divided by underlying Parallel Index Scan, and the Merge join does not even know that it is being run in parallel ! The caveat is that the inner side has to be redundantly scanned fully by each worker, followed by a sort if required. In our case the sort operation was not necessary becaues of the index.
There is a scope for improvement to make the Merge Join parallel-aware, by appropriately partitioning sorted data of both tables and do Merge Join of the pairs of partitioned data sets in parallel. But that discussion would need a separate blog.
The inner side of Hash Join is a “Parallel Hash” node. This plan builds a shared hash table by dividing the work among parallel coordinating workers. As with a sequential Hash Join, the outer side waits until the hash table is built. Once it is built, the same workers now start scanning the outer table and doing the join using the shared hash table. The outer scan is essentially a partial scan because each worker does it in parallel. So in our case, it’s a parallel sequential scan.
Parallel-oblivious Hash Join
If the inner side is just a Hash node rather than a “Parallel Hash” node, then it means: a separate full hash table will be built by each of the workers rather than having a shared hash table, which obviously would be expensive than the parallel hash due to the absence of division of hash building work:
By nature, nested loop join has to have the whole inner side scanned for each of the outer tuple. So we can divide the outer scan among workers, and have a complete inner table scan by each of the workers, which will give us a parallel-oblivious Nested Loop Join. There is no need to make it parallel-aware.
Sequential Join
If we disable parallelism, we can see a sequential hash join. Note that all of the above parallel joins are reasonably fater than the below sequential join …
Gather Merge is a modified version of the Gather plan. It basically parallelizes the sort. So Gather gets sorted output from the workers, and then it merges them and returns sorted output.
A sequential Sort took almost almost twice longer :
1 2 3 4 5 6 7 8 9 10
QUERY PLAN --------------------------------------------------------------------------------- Sort (actual time=14399.200..18068.514 rows=23998124 loops=1) Sort Key: l_suppkey Sort Method: external merge Disk: 610560kB -> Seq Scan on lineitem (actual time=16.346..4320.823 rows=23998124 loops=1) Filter: (l_suppkey > 10000) Rows Removed by Filter: 6001671 Planning Time: 0.086 ms Execution Time: 20015.980 ms
There are lot of other scenarios where parallelism can be observed, but probably I will take it up in a later blog ….