IMPALA-6228: Control stats extrapolation via tbl prop.

Introduces a new TBLPROPERTY for controlling stats
extrapolation on a per-table basis:

impala.enable.stats.extrapolation=true/false

The property key was chosen to be consistent with
the impalad startup flag --enable_stats_extrapolation
and to indicate that the property was set and is used
by Impala.

Behavior:
- If the property is not set, then the extrapolation
  behavior is determined by the impalad startup flag.
- If the property is set, it overrides the impalad
  startup flag, i.e., extrapolation can be explicitly
  enabled or disabled regardless of the startup flag.

Testing:
- added new unit tests
- code/hdfs run passed

Change-Id: Ie49597bf1b93b7572106abc620d91f199cba0cfd
Reviewed-on: http://gerrit.cloudera.org:8080/9139
Reviewed-by: Alex Behm <alex.behm@cloudera.com>
Tested-by: Impala Public Jenkins
This commit is contained in:
Alex Behm
2018-01-24 11:58:53 -08:00
committed by Impala Public Jenkins
parent fc529b7f9f
commit 1a1927b07d
9 changed files with 357 additions and 173 deletions

View File

@@ -52,7 +52,10 @@ import com.google.common.collect.Sets;
* Represents the following statements for statistics collection. Which statistics
* are computed and stored depends on the statement type (incremental or not), the
* clauses used (sampling, partition spec), as well as whether stats extrapolation
* is enabled or not (--enable_stats_extrapolation).
* is enabled or not.
* Stats extrapolation can be configured:
* - at the impalad level with --enable_stats_extrapolation
* - at the table level HdfsTable.TBL_PROP_ENABLE_STATS_EXTRAPOLATION
*
* 1. COMPUTE STATS <table> [(col_list)] [TABLESAMPLE SYSTEM(<perc>) [REPEATABLE(<seed>)]]
* - Stats extrapolation enabled:
@@ -481,8 +484,10 @@ public class ComputeStatsStmt extends StatementBase {
}
} else {
// Not computing incremental stats.
expectAllPartitions_ = !(table_ instanceof HdfsTable) ||
!BackendConfig.INSTANCE.enableStatsExtrapolation();
expectAllPartitions_ = true;
if (table_ instanceof HdfsTable) {
expectAllPartitions_ = !((HdfsTable) table_).isStatsExtrapolationEnabled();
}
}
if (filterPreds.size() > MAX_INCREMENTAL_PARTITIONS) {
@@ -576,10 +581,14 @@ public class ComputeStatsStmt extends StatementBase {
if (!(table_ instanceof HdfsTable)) {
throw new AnalysisException("TABLESAMPLE is only supported on HDFS tables.");
}
if (!BackendConfig.INSTANCE.enableStatsExtrapolation()) {
throw new AnalysisException(
"COMPUTE STATS TABLESAMPLE requires --enable_stats_extrapolation=true. " +
"Stats extrapolation is currently disabled.");
HdfsTable hdfsTable = (HdfsTable) table_;
if (!hdfsTable.isStatsExtrapolationEnabled()) {
throw new AnalysisException(String.format(
"COMPUTE STATS TABLESAMPLE requires stats extrapolation which is disabled.\n" +
"Stats extrapolation can be enabled service-wide with %s=true or by altering " +
"the table to have tblproperty %s=true",
"--enable_stats_extrapolation",
HdfsTable.TBL_PROP_ENABLE_STATS_EXTRAPOLATION));
}
sampleParams_.analyze(analyzer);
long sampleSeed;
@@ -592,7 +601,6 @@ public class ComputeStatsStmt extends StatementBase {
// Compute the sample of files and set 'sampleFileBytes_'.
long minSampleBytes = analyzer.getQueryOptions().compute_stats_min_sample_size;
long samplePerc = sampleParams_.getPercentBytes();
HdfsTable hdfsTable = (HdfsTable) table_;
Map<Long, List<FileDescriptor>> sample = hdfsTable.getFilesSample(
hdfsTable.getPartitions(), samplePerc, minSampleBytes, sampleSeed);
long sampleFileBytes = 0;
@@ -696,7 +704,7 @@ public class ComputeStatsStmt extends StatementBase {
*/
private boolean updateTableStatsOnly() {
if (!(table_ instanceof HdfsTable)) return true;
return !isIncremental_ && BackendConfig.INSTANCE.enableStatsExtrapolation();
return !isIncremental_ && ((HdfsTable) table_).isStatsExtrapolationEnabled();
}
/**

View File

@@ -126,6 +126,12 @@ public class HdfsTable extends Table {
// Table property key for skip.header.line.count
public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count";
// Table property key for overriding the Impalad-wide --enable_stats_extrapolation
// setting for a specific table. By default, tables do not have the property set and
// rely on the Impalad-wide --enable_stats_extrapolation flag.
public static final String TBL_PROP_ENABLE_STATS_EXTRAPOLATION =
"impala.enable.stats.extrapolation";
// Average memory requirements (in bytes) for storing the metadata of a partition.
private static final long PER_PARTITION_MEM_USAGE_BYTES = 2048;
@@ -1951,7 +1957,7 @@ public class HdfsTable extends Table {
* Otherwise, returns a value >= 1.
*/
public long getExtrapolatedNumRows(long fileBytes) {
if (!BackendConfig.INSTANCE.enableStatsExtrapolation()) return -1;
if (!isStatsExtrapolationEnabled()) return -1;
if (fileBytes == 0) return 0;
if (fileBytes < 0) return -1;
if (tableStats_.num_rows < 0 || tableStats_.total_file_bytes <= 0) return -1;
@@ -1961,6 +1967,18 @@ public class HdfsTable extends Table {
return (long) Math.max(1, Math.round(extrapolatedNumRows));
}
/**
* Returns true if stats extrapolation is enabled for this table, false otherwise.
* Reconciles the Impalad-wide --enable_stats_extrapolation flag and the
* TBL_PROP_ENABLE_STATS_EXTRAPOLATION table property
*/
public boolean isStatsExtrapolationEnabled() {
org.apache.hadoop.hive.metastore.api.Table msTbl = getMetaStoreTable();
String propVal = msTbl.getParameters().get(TBL_PROP_ENABLE_STATS_EXTRAPOLATION);
if (propVal == null) return BackendConfig.INSTANCE.isStatsExtrapolationEnabled();
return Boolean.parseBoolean(propVal);
}
/**
* Returns statistics on this table as a tabular result set. Used for the
* SHOW TABLE STATS statement. The schema of the returned TResultSet is set
@@ -1978,7 +1996,7 @@ public class HdfsTable extends Table {
resultSchema.addToColumns(colDesc);
}
boolean statsExtrap = BackendConfig.INSTANCE.enableStatsExtrapolation();
boolean statsExtrap = isStatsExtrapolationEnabled();
resultSchema.addToColumns(new TColumn("#Rows", Type.BIGINT.toThrift()));
if (statsExtrap) {

View File

@@ -1067,7 +1067,7 @@ public class HdfsScanNode extends ScanNode {
output.append(getStatsExplainString(detailPrefix));
output.append("\n");
String extrapRows = String.valueOf(extrapolatedNumRows_);
if (!BackendConfig.INSTANCE.enableStatsExtrapolation()) {
if (!tbl_.isStatsExtrapolationEnabled()) {
extrapRows = "disabled";
} else if (extrapolatedNumRows_ == -1) {
extrapRows = "unavailable";

View File

@@ -54,7 +54,7 @@ public class BackendConfig {
return !Strings.isNullOrEmpty(backendCfg_.lineage_event_log_dir);
}
public long getIncStatsMaxSize() { return backendCfg_.inc_stats_size_limit_bytes; }
public boolean enableStatsExtrapolation() {
public boolean isStatsExtrapolationEnabled() {
return backendCfg_.enable_stats_extrapolation;
}
public boolean isAuthToLocalEnabled() {

View File

@@ -1279,7 +1279,67 @@ public class AnalyzeDDLTest extends FrontendTestBase {
TBackendGflags gflags = BackendConfig.INSTANCE.getBackendCfg();
boolean origEnableStatsExtrapolation = gflags.isEnable_stats_extrapolation();
try {
// Setup for testing combinations of extrapolation config options.
addTestDb("extrap_config", null);
addTestTable("create table extrap_config.tbl_prop_unset (i int)");
addTestTable("create table extrap_config.tbl_prop_false (i int) " +
"tblproperties('impala.enable.stats.extrapolation'='false')");
addTestTable("create table extrap_config.tbl_prop_true (i int) " +
"tblproperties('impala.enable.stats.extrapolation'='true')");
String stmt = "compute stats %s tablesample system (10)";
String err = "COMPUTE STATS TABLESAMPLE requires stats extrapolation";
// Test --enable_stats_extrapolation=false
gflags.setEnable_stats_extrapolation(false);
// Table property unset --> Extrapolation disabled
AnalysisError(String.format(stmt, "extrap_config.tbl_prop_unset"), err);
// Table property false --> Extrapolation disabled
AnalysisError(String.format(stmt, "extrap_config.tbl_prop_false"), err);
// Table property true --> Extrapolation enabled
AnalyzesOk(String.format(stmt, "extrap_config.tbl_prop_true"));
// Test --enable_stats_extrapolation=true
gflags.setEnable_stats_extrapolation(true);
// Table property unset --> Extrapolation enabled
AnalyzesOk(String.format(stmt, "extrap_config.tbl_prop_unset"));
// Table property false --> Extrapolation disabled
AnalysisError(String.format(stmt, "extrap_config.tbl_prop_false"), err);
// Table property true --> Extrapolation enabled
AnalyzesOk(String.format(stmt, "extrap_config.tbl_prop_true"));
// Test file formats.
gflags.setEnable_stats_extrapolation(true);
checkComputeStatsStmt("compute stats functional.alltypes tablesample system (10)");
checkComputeStatsStmt(
"compute stats functional.alltypes tablesample system (55) repeatable(1)");
AnalysisError("compute stats functional.alltypes tablesample system (101)",
"Invalid percent of bytes value '101'. " +
"The percent of bytes to sample must be between 0 and 100.");
AnalysisError("compute stats functional_kudu.alltypes tablesample system (1)",
"TABLESAMPLE is only supported on HDFS tables.");
AnalysisError("compute stats functional_hbase.alltypes tablesample system (2)",
"TABLESAMPLE is only supported on HDFS tables.");
AnalysisError(
"compute stats functional.alltypes_datasource tablesample system (3)",
"TABLESAMPLE is only supported on HDFS tables.");
// Test file formats with columns whitelist.
gflags.setEnable_stats_extrapolation(true);
checkComputeStatsStmt(
"compute stats functional.alltypes (int_col, double_col) tablesample " +
"system (55) repeatable(1)",
Lists.newArrayList("int_col", "double_col"));
AnalysisError("compute stats functional.alltypes tablesample system (101)",
"Invalid percent of bytes value '101'. " +
"The percent of bytes to sample must be between 0 and 100.");
AnalysisError("compute stats functional_kudu.alltypes tablesample system (1)",
"TABLESAMPLE is only supported on HDFS tables.");
AnalysisError("compute stats functional_hbase.alltypes tablesample system (2)",
"TABLESAMPLE is only supported on HDFS tables.");
AnalysisError(
"compute stats functional.alltypes_datasource tablesample system (3)",
"TABLESAMPLE is only supported on HDFS tables.");
// Test different COMPUTE_STATS_MIN_SAMPLE_BYTES.
TQueryOptions queryOpts = new TQueryOptions();
@@ -1328,26 +1388,6 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// changes. Expect a sample between 4 and 6 of the 24 total files.
Assert.assertTrue(adjustedStmt.getEffectiveSamplingPerc() >= 4.0 / 24);
Assert.assertTrue(adjustedStmt.getEffectiveSamplingPerc() <= 6.0 / 24);
// Checks that whitelisted columns works with tablesample.
checkComputeStatsStmt(
"compute stats functional.alltypes (int_col, double_col) tablesample " +
"system (55) repeatable(1)",
Lists.newArrayList("int_col", "double_col"));
AnalysisError("compute stats functional.alltypes tablesample system (101)",
"Invalid percent of bytes value '101'. " +
"The percent of bytes to sample must be between 0 and 100.");
AnalysisError("compute stats functional_kudu.alltypes tablesample system (1)",
"TABLESAMPLE is only supported on HDFS tables.");
AnalysisError("compute stats functional_hbase.alltypes tablesample system (2)",
"TABLESAMPLE is only supported on HDFS tables.");
AnalysisError(
"compute stats functional.alltypes_datasource tablesample system (3)",
"TABLESAMPLE is only supported on HDFS tables.");
gflags.setEnable_stats_extrapolation(false);
AnalysisError("compute stats functional.alltypes tablesample system (10)",
"COMPUTE STATS TABLESAMPLE requires --enable_stats_extrapolation=true. " +
"Stats extrapolation is currently disabled.");
} finally {
gflags.setEnable_stats_extrapolation(origEnableStatsExtrapolation);
}

View File

@@ -20,6 +20,7 @@ package org.apache.impala.planner;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.impala.catalog.HdfsTable;
@@ -32,28 +33,36 @@ import org.junit.Test;
import com.google.common.base.Preconditions;
/**
* Tests the behavior of stats extrapolation with valid, invalid, and unset stats,
* as well as extreme values and other edge cases.
* Tests the configuration options and behavior of stats extrapolation with valid,
* invalid, and unset stats, as well as extreme values and other edge cases.
*/
public class StatsExtrapolationTest extends FrontendTestBase {
/**
* Sets the row count and total file size stats in the given table.
* Unsets the corresponding statistic if a null value is passed.
* Preserves existing table properties.
*/
private void setStats(Table tbl, Long rowCount, Long totalSize) {
org.apache.hadoop.hive.metastore.api.Table msTbl =
new org.apache.hadoop.hive.metastore.api.Table();
msTbl.setParameters(new HashMap<String, String>());
org.apache.hadoop.hive.metastore.api.Table msTbl = tbl.getMetaStoreTable();
if (msTbl == null) {
msTbl = new org.apache.hadoop.hive.metastore.api.Table();
msTbl.setParameters(new HashMap<String, String>());
}
if (msTbl.getParameters() == null) {
msTbl.setParameters(new HashMap<String, String>());
}
Map<String, String> params = msTbl.getParameters();
if (rowCount != null) {
msTbl.getParameters().put(StatsSetupConst.ROW_COUNT,
String.valueOf(rowCount));
params.put(StatsSetupConst.ROW_COUNT, String.valueOf(rowCount));
} else {
params.remove(StatsSetupConst.ROW_COUNT);
}
if (totalSize != null) {
msTbl.getParameters().put(StatsSetupConst.TOTAL_SIZE,
String.valueOf(totalSize));
params.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(totalSize));
} else {
params.remove(StatsSetupConst.TOTAL_SIZE);
}
tbl.setMetaStoreTable(msTbl);
tbl.setTableStats(msTbl);
}
@@ -61,8 +70,8 @@ public class StatsExtrapolationTest extends FrontendTestBase {
long fileBytes, long expectedExtrapNumRows) {
Preconditions.checkState(tbl instanceof HdfsTable);
setStats(tbl, rowCount, totalSize);
long actualExrtapNumRows = ((HdfsTable)tbl).getExtrapolatedNumRows(fileBytes);
assertEquals(expectedExtrapNumRows, actualExrtapNumRows);
long actualExtrapNumRows = ((HdfsTable)tbl).getExtrapolatedNumRows(fileBytes);
assertEquals(expectedExtrapNumRows, actualExtrapNumRows);
}
private void testInvalidStats(Table tbl, Long rowCount, Long totalSize) {
@@ -79,7 +88,7 @@ public class StatsExtrapolationTest extends FrontendTestBase {
addTestDb("extrap_stats", null);
Table tbl = addTestTable("create table extrap_stats.t (i int)");
// Modify/restore the backend config for this test.
// Replace/restore the static backend config for this test.
TBackendGflags gflags = BackendConfig.INSTANCE.getBackendCfg();
boolean origEnableStatsExtrapolation = gflags.isEnable_stats_extrapolation();
try {
@@ -134,24 +143,56 @@ public class StatsExtrapolationTest extends FrontendTestBase {
}
@Test
public void TestStatsExtrapolationDisabled() {
addTestDb("extrap_stats", null);
Table tbl = addTestTable("create table extrap_stats.t (i int)");
public void TestStatsExtrapolationConfig() {
addTestDb("extrap_config", null);
Table propUnsetTbl =
addTestTable("create table extrap_config.tbl_prop_unset (i int)");
Table propFalseTbl =
addTestTable("create table extrap_config.tbl_prop_false (i int) " +
"tblproperties('impala.enable.stats.extrapolation'='false')");
Table propTrueTbl =
addTestTable("create table extrap_config.tbl_prop_true (i int) " +
"tblproperties('impala.enable.stats.extrapolation'='true')");
// Modify/restore the backend config for this test.
// Replace/restore the static backend config for this test.
TBackendGflags gflags = BackendConfig.INSTANCE.getBackendCfg();
boolean origEnableStatsExtrapolation = gflags.isEnable_stats_extrapolation();
try {
// Test --enable_stats_extrapolation=false
gflags.setEnable_stats_extrapolation(false);
// Table property unset --> Extrapolation disabled
configTestExtrapolationDisabled(propUnsetTbl);
// Table property false --> Extrapolation disabled
configTestExtrapolationDisabled(propFalseTbl);
// Table property true --> Extrapolation enabled
configTestExtrapolationEnabled(propTrueTbl);
// Always expect -1 even with legitimate stats.
runTest(tbl, 100L, 1000L, 0, -1);
runTest(tbl, 100L, 1000L, 100, -1);
runTest(tbl, 100L, 1000L, 1000000000, -1);
runTest(tbl, 100L, 1000L, Long.MAX_VALUE, -1);
runTest(tbl, 100L, 1000L, -100, -1);
// Test --enable_stats_extrapolation=true
gflags.setEnable_stats_extrapolation(true);
// Table property unset --> Extrapolation enabled
configTestExtrapolationEnabled(propUnsetTbl);
// Table property false --> Extrapolation disabled
configTestExtrapolationDisabled(propFalseTbl);
// Table property true --> Extrapolation enabled
configTestExtrapolationEnabled(propTrueTbl);
} finally {
gflags.setEnable_stats_extrapolation(origEnableStatsExtrapolation);
}
}
private void configTestExtrapolationDisabled(Table tbl) {
runTest(tbl, 100L, 1000L, 0, -1);
runTest(tbl, 100L, 1000L, 100, -1);
runTest(tbl, 100L, 1000L, 1000000000, -1);
runTest(tbl, 100L, 1000L, Long.MAX_VALUE, -1);
runTest(tbl, 100L, 1000L, -100, -1);
}
private void configTestExtrapolationEnabled(Table tbl) {
runTest(tbl, 100L, 1000L, 0, 0);
runTest(tbl, 100L, 1000L, 100, 10);
runTest(tbl, 100L, 1000L, 1000000000, 100000000);
runTest(tbl, 100L, 1000L, Long.MAX_VALUE, 922337203685477632L);
runTest(tbl, 100L, 1000L, -100, -1);
}
}

View File

@@ -2,6 +2,7 @@
---- QUERY
# This test relies on a deterministic row order so we use "sort by (id)".
create table alltypes sort by (id) like functional_parquet.alltypes;
alter table alltypes set tblproperties("impala.enable.stats.extrapolation"="true");
insert into alltypes partition(year, month)
select * from functional_parquet.alltypes where year = 2009;
====

View File

@@ -15,16 +15,17 @@
# specific language governing permissions and limitations
# under the License.
from os import path
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.test_dimensions import (
create_exec_option_dimension,
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
from tests.util.hdfs_util import NAMENODE
class TestStatsExtrapolation(CustomClusterTestSuite):
"""Minimal end-to-end test for the --enable_stats_extrapolation impalad flag. This test
primarly checks that the flag is propagated to the FE. More testing is done in FE unit
tests and metadata/test_stats_extrapolation.py."""
@classmethod
def get_workload(self):
@@ -37,121 +38,21 @@ class TestStatsExtrapolation(CustomClusterTestSuite):
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
@CustomClusterTestSuite.with_args(impalad_args=('--enable_stats_extrapolation=true'))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(impalad_args="--enable_stats_extrapolation=true")
def test_stats_extrapolation(self, vector, unique_database):
vector.get_value('exec_option')['num_nodes'] = 1
vector.get_value('exec_option')['explain_level'] = 2
self.run_test_case('QueryTest/stats-extrapolation', vector, unique_database)
@CustomClusterTestSuite.with_args(impalad_args=('--enable_stats_extrapolation=true'))
def test_compute_stats_tablesample(self, vector, unique_database):
"""COMPUTE STATS TABLESAMPLE is inherently non-deterministic due to its use of
SAMPLED_NDV() so we test it specially. The goal of this test is to ensure that
COMPUTE STATS TABLESAMPLE computes in-the-right-ballpark stats and successfully
stores them in the HMS."""
# Since our test tables are small, set the minimum sample size to 0 to make sure
# we exercise the sampling code paths.
self.client.execute("set compute_stats_min_sample_size=0")
# Test partitioned table.
# Test row count extrapolation
self.client.execute("set explain_level=2")
explain_result = self.client.execute("explain select * from functional.alltypes")
assert "extrapolated-rows=7300" in " ".join(explain_result.data)
# Test COMPUTE STATS TABLESAMPLE
part_test_tbl = unique_database + ".alltypes"
self.clone_table("functional.alltypes", part_test_tbl, True, vector)
self.__run_sampling_test(part_test_tbl, "", "functional.alltypes", 1, 3)
self.__run_sampling_test(part_test_tbl, "", "functional.alltypes", 10, 7)
self.__run_sampling_test(part_test_tbl, "", "functional.alltypes", 20, 13)
self.__run_sampling_test(part_test_tbl, "", "functional.alltypes", 100, 99)
# Test unpartitioned table.
nopart_test_tbl = unique_database + ".alltypesnopart"
self.client.execute("create table {0} as select * from functional.alltypes"\
.format(nopart_test_tbl))
# Clone to use as a baseline. We run the regular COMPUTE STATS on this table.
nopart_test_tbl_exp = unique_database + ".alltypesnopart_exp"
self.clone_table(nopart_test_tbl, nopart_test_tbl_exp, False, vector)
self.client.execute("compute stats {0}".format(nopart_test_tbl_exp))
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_exp, 1, 3)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_exp, 10, 7)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_exp, 20, 13)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_exp, 100, 99)
# Test empty table.
empty_test_tbl = unique_database + ".empty_tbl"
self.clone_table("functional.alltypes", empty_test_tbl, False, vector)
self.__run_sampling_test(empty_test_tbl, "", empty_test_tbl, 10, 7)
# Test wide table. Should not crash or error. This takes a few minutes so restrict
# to exhaustive.
if self.exploration_strategy() == "exhaustive":
wide_test_tbl = unique_database + ".wide"
self.clone_table("functional.widetable_1000_cols", wide_test_tbl, False, vector)
self.client.execute(
"compute stats {0} tablesample system(10)".format(wide_test_tbl))
# Test column subset.
column_subset_tbl = unique_database + ".column_subset"
columns = "(int_col, string_col)"
self.clone_table("functional.alltypes", column_subset_tbl, True, vector)
self.__run_sampling_test(column_subset_tbl, columns, "functional.alltypes", 1, 3)
self.__run_sampling_test(column_subset_tbl, columns, "functional.alltypes", 10, 7)
self.__run_sampling_test(column_subset_tbl, columns, "functional.alltypes", 20, 13)
self.__run_sampling_test(column_subset_tbl, columns, "functional.alltypes", 100, 99)
# Test no columns.
no_column_tbl = unique_database + ".no_columns"
columns = "()"
self.clone_table("functional.alltypes", no_column_tbl, True, vector)
self.__run_sampling_test(no_column_tbl, columns, "functional.alltypes", 10, 7)
def __run_sampling_test(self, tbl, cols, expected_tbl, perc, seed):
"""Drops stats on 'tbl' and then runs COMPUTE STATS TABLESAMPLE on 'tbl' with the
given column restriction clause, sampling percent and random seed. Checks that
the resulting table and column stats are reasoanbly close to those of
'expected_tbl'."""
self.client.execute("drop stats {0}".format(tbl))
self.client.execute("compute stats {0}{1} tablesample system ({2}) repeatable ({3})"\
.format(tbl, cols, perc, seed))
self.__check_table_stats(tbl, expected_tbl)
self.__check_column_stats(tbl, expected_tbl)
def __check_table_stats(self, tbl, expected_tbl):
"""Checks that the row counts reported in SHOW TABLE STATS on 'tbl' are within 2x
of those reported for 'expected_tbl'. Assumes that COMPUTE STATS was previously run
on 'expected_table' and that COMPUTE STATS TABLESAMPLE was run on 'tbl'."""
actual = self.client.execute("show table stats {0}".format(tbl))
expected = self.client.execute("show table stats {0}".format(expected_tbl))
assert len(actual.data) == len(expected.data)
assert len(actual.schema.fieldSchemas) == len(expected.schema.fieldSchemas)
col_names = [fs.name.upper() for fs in actual.schema.fieldSchemas]
rows_col_idx = col_names.index("#ROWS")
extrap_rows_col_idx = col_names.index("EXTRAP #ROWS")
for i in xrange(0, len(actual.data)):
act_cols = actual.data[i].split("\t")
exp_cols = expected.data[i].split("\t")
assert int(exp_cols[rows_col_idx]) >= 0
self.appx_equals(\
int(act_cols[extrap_rows_col_idx]), int(exp_cols[rows_col_idx]), 2)
# Only the table-level row count is stored. The partition row counts
# are extrapolated.
if act_cols[0] == "Total":
self.appx_equals(
int(act_cols[rows_col_idx]), int(exp_cols[rows_col_idx]), 2)
elif len(actual.data) > 1:
# Partition row count is expected to not be set.
assert int(act_cols[rows_col_idx]) == -1
def __check_column_stats(self, tbl, expected_tbl):
"""Checks that the NDVs in SHOW COLUMNS STATS on 'tbl' are within 2x of those
reported for 'expected_tbl'. Assumes that COMPUTE STATS was previously run
on 'expected_table' and that COMPUTE STATS TABLESAMPLE was run on 'tbl'."""
actual = self.client.execute("show column stats {0}".format(tbl))
expected = self.client.execute("show column stats {0}".format(expected_tbl))
assert len(actual.data) == len(expected.data)
assert len(actual.schema.fieldSchemas) == len(expected.schema.fieldSchemas)
col_names = [fs.name.upper() for fs in actual.schema.fieldSchemas]
self.client.execute(
"compute stats {0} tablesample system (13)".format(part_test_tbl))
# Check that column stats were set.
col_stats = self.client.execute("show column stats {0}".format(part_test_tbl))
col_names = [fs.name.upper() for fs in col_stats.schema.fieldSchemas]
ndv_col_idx = col_names.index("#DISTINCT VALUES")
for i in xrange(0, len(actual.data)):
act_cols = actual.data[i].split("\t")
exp_cols = expected.data[i].split("\t")
assert int(exp_cols[ndv_col_idx]) >= 0
self.appx_equals(int(act_cols[ndv_col_idx]), int(exp_cols[ndv_col_idx]), 2)
for row in col_stats.data:
assert int(row.split("\t")[ndv_col_idx]) >= 0

View File

@@ -0,0 +1,175 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from os import path
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import (
create_exec_option_dimension,
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
class TestStatsExtrapolation(ImpalaTestSuite):
"""Test stats extrapolation and compute stats tablesample. Stats extrapolation is
enabled via table property and not via the impalad startup flag so these tests can be
run as regular tests (non-custom-cluster) and in parallel with other tests."""
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestStatsExtrapolation, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def test_stats_extrapolation(self, vector, unique_database):
vector.get_value('exec_option')['num_nodes'] = 1
vector.get_value('exec_option')['explain_level'] = 2
self.run_test_case('QueryTest/stats-extrapolation', vector, unique_database)
def test_compute_stats_tablesample(self, vector, unique_database):
"""COMPUTE STATS TABLESAMPLE is inherently non-deterministic due to its use of
SAMPLED_NDV() so we test it specially. The goal of this test is to ensure that
COMPUTE STATS TABLESAMPLE computes in-the-right-ballpark stats and successfully
stores them in the HMS."""
# Since our test tables are small, set the minimum sample size to 0 to make sure
# we exercise the sampling code paths.
self.client.execute("set compute_stats_min_sample_size=0")
# Test partitioned table.
part_test_tbl = unique_database + ".alltypes"
self.clone_table("functional.alltypes", part_test_tbl, True, vector)
# Clone to use as a baseline. We run the regular COMPUTE STATS on this table.
part_test_tbl_base = unique_database + ".alltypes_base"
self.clone_table(part_test_tbl, part_test_tbl_base, True, vector)
self.client.execute("compute stats {0}".format(part_test_tbl_base))
# Enable stats extrapolation on both tables to match SHOW output.
self.__set_extrapolation_tblprop(part_test_tbl)
self.__set_extrapolation_tblprop(part_test_tbl_base)
self.__run_sampling_test(part_test_tbl, "", part_test_tbl_base, 1, 3)
self.__run_sampling_test(part_test_tbl, "", part_test_tbl_base, 10, 7)
self.__run_sampling_test(part_test_tbl, "", part_test_tbl_base, 20, 13)
self.__run_sampling_test(part_test_tbl, "", part_test_tbl_base, 100, 99)
# Test unpartitioned table.
nopart_test_tbl = unique_database + ".alltypesnopart"
self.client.execute("create table {0} as select * from functional.alltypes"\
.format(nopart_test_tbl))
# Clone to use as a baseline. We run the regular COMPUTE STATS on this table.
nopart_test_tbl_base = unique_database + ".alltypesnopart_base"
self.clone_table(nopart_test_tbl, nopart_test_tbl_base, False, vector)
self.client.execute("compute stats {0}".format(nopart_test_tbl_base))
# Enable stats extrapolation on both tables to match SHOW output.
self.__set_extrapolation_tblprop(nopart_test_tbl)
self.__set_extrapolation_tblprop(nopart_test_tbl_base)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_base, 1, 3)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_base, 10, 7)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_base, 20, 13)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_base, 100, 99)
# Test empty table.
empty_test_tbl = unique_database + ".empty_tbl"
self.clone_table("functional.alltypes", empty_test_tbl, False, vector)
self.__set_extrapolation_tblprop(empty_test_tbl)
self.__run_sampling_test(empty_test_tbl, "", empty_test_tbl, 10, 7)
# Test column subset.
column_subset_tbl = unique_database + ".column_subset"
columns = "(int_col, string_col)"
self.clone_table("functional.alltypes", column_subset_tbl, True, vector)
self.__set_extrapolation_tblprop(column_subset_tbl)
self.__run_sampling_test(column_subset_tbl, columns, part_test_tbl_base, 1, 3)
self.__run_sampling_test(column_subset_tbl, columns, part_test_tbl_base, 10, 7)
self.__run_sampling_test(column_subset_tbl, columns, part_test_tbl_base, 20, 13)
self.__run_sampling_test(column_subset_tbl, columns, part_test_tbl_base, 100, 99)
# Test no columns.
no_column_tbl = unique_database + ".no_columns"
columns = "()"
self.clone_table("functional.alltypes", no_column_tbl, True, vector)
self.__set_extrapolation_tblprop(no_column_tbl)
self.__run_sampling_test(no_column_tbl, columns, part_test_tbl_base, 10, 7)
# Test wide table. Should not crash or error. This takes a few minutes so restrict
# to exhaustive.
if self.exploration_strategy() == "exhaustive":
wide_test_tbl = unique_database + ".wide"
self.clone_table("functional.widetable_1000_cols", wide_test_tbl, False, vector)
self.__set_extrapolation_tblprop(wide_test_tbl)
self.client.execute(
"compute stats {0} tablesample system(10)".format(wide_test_tbl))
def __set_extrapolation_tblprop(self, tbl):
"""Alters the given table to enable stats extrapolation via tblproperty."""
self.client.execute("alter table {0} set "\
"tblproperties('impala.enable.stats.extrapolation'='true')".format(tbl))
def __run_sampling_test(self, tbl, cols, expected_tbl, perc, seed):
"""Drops stats on 'tbl' and then runs COMPUTE STATS TABLESAMPLE on 'tbl' with the
given column restriction clause, sampling percent and random seed. Checks that
the resulting table and column stats are reasoanbly close to those of
'expected_tbl'."""
self.client.execute("drop stats {0}".format(tbl))
self.client.execute("compute stats {0}{1} tablesample system ({2}) repeatable ({3})"\
.format(tbl, cols, perc, seed))
self.__check_table_stats(tbl, expected_tbl)
self.__check_column_stats(tbl, expected_tbl)
def __check_table_stats(self, tbl, expected_tbl):
"""Checks that the row counts reported in SHOW TABLE STATS on 'tbl' are within 2x
of those reported for 'expected_tbl'. Assumes that COMPUTE STATS was previously run
on 'expected_table' and that COMPUTE STATS TABLESAMPLE was run on 'tbl'."""
actual = self.client.execute("show table stats {0}".format(tbl))
expected = self.client.execute("show table stats {0}".format(expected_tbl))
assert len(actual.data) == len(expected.data)
assert len(actual.schema.fieldSchemas) == len(expected.schema.fieldSchemas)
col_names = [fs.name.upper() for fs in actual.schema.fieldSchemas]
rows_col_idx = col_names.index("#ROWS")
extrap_rows_col_idx = col_names.index("EXTRAP #ROWS")
for i in xrange(0, len(actual.data)):
act_cols = actual.data[i].split("\t")
exp_cols = expected.data[i].split("\t")
assert int(exp_cols[rows_col_idx]) >= 0
self.appx_equals(\
int(act_cols[extrap_rows_col_idx]), int(exp_cols[rows_col_idx]), 2)
# Only the table-level row count is stored. The partition row counts
# are extrapolated.
if act_cols[0] == "Total":
self.appx_equals(
int(act_cols[rows_col_idx]), int(exp_cols[rows_col_idx]), 2)
elif len(actual.data) > 1:
# Partition row count is expected to not be set.
assert int(act_cols[rows_col_idx]) == -1
def __check_column_stats(self, tbl, expected_tbl):
"""Checks that the NDVs in SHOW COLUMNS STATS on 'tbl' are within 2x of those
reported for 'expected_tbl'. Assumes that COMPUTE STATS was previously run
on 'expected_table' and that COMPUTE STATS TABLESAMPLE was run on 'tbl'."""
actual = self.client.execute("show column stats {0}".format(tbl))
expected = self.client.execute("show column stats {0}".format(expected_tbl))
assert len(actual.data) == len(expected.data)
assert len(actual.schema.fieldSchemas) == len(expected.schema.fieldSchemas)
col_names = [fs.name.upper() for fs in actual.schema.fieldSchemas]
ndv_col_idx = col_names.index("#DISTINCT VALUES")
for i in xrange(0, len(actual.data)):
act_cols = actual.data[i].split("\t")
exp_cols = expected.data[i].split("\t")
assert int(exp_cols[ndv_col_idx]) >= 0
self.appx_equals(int(act_cols[ndv_col_idx]), int(exp_cols[ndv_col_idx]), 2)