mirror of
https://github.com/apache/impala.git
synced 2026-01-06 06:01:03 -05:00
The overall goal of this change allow for table metadata to be loaded in the background but also to allow prioritization of loading on an as-needed basis. As part of analysis, any tables that are not loaded are tracked and if analysis fails the Impalad will make an RPC to the CatalogServer to requiest the metadata loading of these tables be prioritized and analysis will be restarted. To support this, the CatalogServer now has a deque of the tables to load. For background loading, tables to load are added to the tail of the deque. However, a new CatalogServer RPC was added that can prioritize the loading of one or more tables in which case they will get added to the head of the deque. The next table to load is always taken from the head. This helps prioritize loading but is admittedly not the most fair approach. The support the prioritized loading, some changes had to made on the Impalad side during analysis: - During analysis, any tables that are missing metadata are tracked. - Analysis now runs in a loop. If it fails due to an AnalysisException AND at least 1 table/view was missing metadata, these tables missing metadata are requested to be loaded by calling the CatalogServer. - The impalad will wait until the required tables are received (by getting notified each time there is a call to updateCatalog()), and waiting to run analysis until all tables are available. Once the tables are available, analysis will restart. This change also introduces two new flags: --load_catalog_in_background (bool). When this is true (the default) the catalog server will run a period background thread to queue all unloaded tables for loading. This is generally the desired behavior, but there may be some cases (very large metastores) where this may need to be disabled. --num_metadata_loading_threads (int32). The number of threads to use when loading catalog metadata (degree of parallelism). The default is 16, but it can be increased to improve performance at the cost of stressing the Hive metastore/HDFS. Change-Id: Ib94dbbf66ffcffea8c490f50f5c04d19fb2078ad Reviewed-on: http://gerrit.ent.cloudera.com:8080/1476 Reviewed-by: Lenni Kuff <lskuff@cloudera.com> Tested-by: jenkins Reviewed-on: http://gerrit.ent.cloudera.com:8080/1538
163 lines
4.5 KiB
Plaintext
163 lines
4.5 KiB
Plaintext
====
|
|
---- QUERY
|
|
# Tests explaining a query (TPCDS-Q19)
|
|
explain
|
|
select
|
|
i_brand_id,
|
|
i_brand,
|
|
i_manufact_id,
|
|
i_manufact,
|
|
sum(ss_ext_sales_price) ext_price
|
|
from
|
|
tpcds.store_sales
|
|
join tpcds.item on (store_sales.ss_item_sk = item.i_item_sk)
|
|
join tpcds.customer on (store_sales.ss_customer_sk = customer.c_customer_sk)
|
|
join tpcds.customer_address on (customer.c_current_addr_sk = customer_address.ca_address_sk)
|
|
join tpcds.store on (store_sales.ss_store_sk = store.s_store_sk)
|
|
where
|
|
ss_date between '1999-11-01' and '1999-11-30'
|
|
and i_manager_id = 7
|
|
and substr(ca_zip, 1, 5) <> substr(s_zip, 1, 5)
|
|
group by
|
|
i_brand,
|
|
i_brand_id,
|
|
i_manufact_id,
|
|
i_manufact
|
|
order by
|
|
ext_price desc,
|
|
i_brand,
|
|
i_brand_id,
|
|
i_manufact_id,
|
|
i_manufact
|
|
limit 100
|
|
---- RESULTS
|
|
'Estimated Per-Host Requirements: Memory=144.66MB VCores=5'
|
|
''
|
|
'19:TOP-N [LIMIT=100]'
|
|
'| order by: SUM(ss_ext_sales_price) DESC, i_brand ASC, i_brand_id ASC, i_manufact_id ASC, i_manufact ASC'
|
|
'|'
|
|
'18:EXCHANGE [PARTITION=UNPARTITIONED]'
|
|
'|'
|
|
'10:TOP-N [LIMIT=100]'
|
|
'| order by: SUM(ss_ext_sales_price) DESC, i_brand ASC, i_brand_id ASC, i_manufact_id ASC, i_manufact ASC'
|
|
'|'
|
|
'17:AGGREGATE [MERGE FINALIZE]'
|
|
'| output: SUM(SUM(ss_ext_sales_price))'
|
|
'| group by: i_brand, i_brand_id, i_manufact_id, i_manufact'
|
|
'|'
|
|
'16:EXCHANGE [PARTITION=HASH(i_brand,i_brand_id,i_manufact_id,i_manufact)]'
|
|
'|'
|
|
'09:AGGREGATE'
|
|
'| output: SUM(ss_ext_sales_price)'
|
|
'| group by: i_brand, i_brand_id, i_manufact_id, i_manufact'
|
|
'|'
|
|
'08:HASH JOIN [INNER JOIN, BROADCAST]'
|
|
'| hash predicates: store_sales.ss_store_sk = store.s_store_sk'
|
|
'| other predicates: substr(ca_zip, 1, 5) != substr(s_zip, 1, 5)'
|
|
'|'
|
|
'|--15:EXCHANGE [BROADCAST]'
|
|
'| |'
|
|
'| 04:SCAN HDFS [tpcds.store]'
|
|
'| partitions=1/1 size=3.08KB'
|
|
'|'
|
|
'07:HASH JOIN [INNER JOIN, BROADCAST]'
|
|
'| hash predicates: store_sales.ss_item_sk = item.i_item_sk'
|
|
'|'
|
|
'|--14:EXCHANGE [BROADCAST]'
|
|
'| |'
|
|
'| 01:SCAN HDFS [tpcds.item]'
|
|
'| partitions=1/1 size=4.82MB'
|
|
'| predicates: i_manager_id = 7'
|
|
'|'
|
|
'06:HASH JOIN [INNER JOIN, BROADCAST]'
|
|
'| hash predicates: customer.c_customer_sk = store_sales.ss_customer_sk'
|
|
'|'
|
|
'|--13:EXCHANGE [BROADCAST]'
|
|
'| |'
|
|
'| 00:SCAN HDFS [tpcds.store_sales]'
|
|
'| partitions=2/120 size=663.52KB'
|
|
'|'
|
|
'05:HASH JOIN [INNER JOIN, PARTITIONED]'
|
|
'| hash predicates: customer_address.ca_address_sk = customer.c_current_addr_sk'
|
|
'|'
|
|
'|--12:EXCHANGE [PARTITION=HASH(customer.c_current_addr_sk)]'
|
|
'| |'
|
|
'| 02:SCAN HDFS [tpcds.customer]'
|
|
'| partitions=1/1 size=12.60MB'
|
|
'|'
|
|
'11:EXCHANGE [PARTITION=HASH(customer_address.ca_address_sk)]'
|
|
'|'
|
|
'03:SCAN HDFS [tpcds.customer_address]'
|
|
' partitions=1/1 size=5.25MB'
|
|
====
|
|
---- QUERY
|
|
# Tests explaining an insert query
|
|
explain insert overwrite functional.alltypessmall (id, string_col)
|
|
partition (year, month)
|
|
select a.id, a.string_col, a.year, a.month from functional.alltypes a
|
|
left semi join functional.alltypesagg b on (a.id = b.id)
|
|
where a.year > 2009 and a.month = 4
|
|
union distinct
|
|
select id, string_col, year, month from functional.alltypes
|
|
---- RESULTS
|
|
'Estimated Per-Host Requirements: Memory=192.01MB VCores=3'
|
|
''
|
|
'WRITE TO HDFS [functional.alltypessmall, OVERWRITE=true, PARTITION-KEYS=(year,month)]'
|
|
'| partitions=96'
|
|
'|'
|
|
'05:AGGREGATE [FINALIZE]'
|
|
'| group by: id, string_col, year, month'
|
|
'|'
|
|
'08:EXCHANGE [PARTITION=UNPARTITIONED]'
|
|
'|'
|
|
'|--10:MERGE'
|
|
'| |'
|
|
'| 04:SCAN HDFS [functional.alltypes]'
|
|
'| partitions=24/24 size=956.90KB'
|
|
'|'
|
|
'09:MERGE'
|
|
'|'
|
|
'03:HASH JOIN [LEFT SEMI JOIN, PARTITIONED]'
|
|
'| hash predicates: a.id = b.id'
|
|
'|'
|
|
'|--07:EXCHANGE [PARTITION=HASH(b.id)]'
|
|
'| |'
|
|
'| 02:SCAN HDFS [functional.alltypesagg b]'
|
|
'| partitions=10/10 size=743.67KB'
|
|
'|'
|
|
'06:EXCHANGE [PARTITION=HASH(a.id)]'
|
|
'|'
|
|
'01:SCAN HDFS [functional.alltypes a]'
|
|
' partitions=1/24 size=19.71KB'
|
|
====
|
|
---- QUERY
|
|
# Tests explaining an insert query to/from an HBase table
|
|
explain insert into functional_hbase.alltypes
|
|
select a.* from functional_hbase.alltypessmall a
|
|
cross join functional.alltypessmall b
|
|
where a.year > 2009 and a.month = 4
|
|
union all
|
|
select * from functional_hbase.alltypessmall
|
|
---- RESULTS
|
|
'Estimated Per-Host Requirements: Memory=1.03GB VCores=3'
|
|
''
|
|
'WRITE TO HBASE table=functional_hbase.alltypes'
|
|
'|'
|
|
'06:EXCHANGE [PARTITION=UNPARTITIONED]'
|
|
'|'
|
|
'|--08:MERGE'
|
|
'| |'
|
|
'| 04:SCAN HBASE [functional_hbase.alltypessmall]'
|
|
'|'
|
|
'07:MERGE'
|
|
'|'
|
|
'03:CROSS JOIN [BROADCAST]'
|
|
'|'
|
|
'|--05:EXCHANGE [BROADCAST]'
|
|
'| |'
|
|
'| 02:SCAN HDFS [functional.alltypessmall b]'
|
|
'| partitions=4/4 size=6.32KB'
|
|
'|'
|
|
'01:SCAN HBASE [functional_hbase.alltypessmall a]'
|
|
' predicates: a.year > 2009, a.month = 4'
|
|
==== |