diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index 1a8649f73..e73946fc9 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -53,6 +53,7 @@ namespace impala { static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem"; static const string PER_HOST_VCORES_KEY = "Estimated Per-Host VCores"; static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats"; +static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats"; ImpalaServer::QueryExecState::QueryExecState( const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend, @@ -374,6 +375,19 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest( summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str()); } + if (!query_exec_request.query_ctx.__isset.parent_query_id && + query_exec_request.query_ctx.__isset.tables_with_corrupt_stats && + !query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) { + stringstream ss; + const vector& tbls = + query_exec_request.query_ctx.tables_with_corrupt_stats; + for (int i = 0; i < tbls.size(); ++i) { + if (i != 0) ss << ","; + ss << tbls[i].db_name << "." << tbls[i].table_name; + } + summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str()); + } + // If desc_tbl is not set, query has SELECT with no FROM. In that // case, the query can only have a single fragment, and that fragment needs to be // executed by the coordinator. This check confirms that. diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index bc029ee7f..66d435948 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -186,6 +186,9 @@ struct TQueryCtx { // Set if this is a child query (e.g. a child of a COMPUTE STATS request) 9: optional Types.TUniqueId parent_query_id + + // List of tables suspected to have corrupt stats + 10: optional list tables_with_corrupt_stats } // Context of a fragment instance, including its unique id, the total number diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java index cf7d72828..57fc46557 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java @@ -112,6 +112,10 @@ public class HdfsScanNode extends ScanNode { private final Map> collectionConjuncts_ = Maps.newLinkedHashMap(); + // Indicates corrupt table stats based on the number of non-empty scan ranges and + // numRows set to 0. Set in computeStats(). + private boolean hasCorruptTableStats_; + /** * Constructs node to scan given data files of table 'tbl_'. */ @@ -648,12 +652,19 @@ public class HdfsScanNode extends ScanNode { numPartitionsMissingStats_ = 0; if (tbl_.getPartitions().isEmpty()) { cardinality_ = tbl_.getNumRows(); + if ((cardinality_ < -1 || cardinality_ == 0) && tbl_.getTotalHdfsBytes() > 0) { + hasCorruptTableStats_ = true; + } } else { cardinality_ = 0; totalFiles_ = 0; totalBytes_ = 0; boolean hasValidPartitionCardinality = false; for (HdfsPartition p: partitions_) { + // Check for corrupt table stats + if ((p.getNumRows() == 0 || p.getNumRows() < -1) && p.getSize() > 0) { + hasCorruptTableStats_ = true; + } // ignore partitions with missing stats in the hope they don't matter // enough to change the planning outcome if (p.getNumRows() > -1) { @@ -879,4 +890,7 @@ public class HdfsScanNode extends ScanNode { return (long) RuntimeEnv.INSTANCE.getNumCores() * (long) THREADS_PER_CORE * MAX_IO_BUFFERS_PER_THREAD * IO_MGR_BUFFER_SIZE; } + + @Override + public boolean hasCorruptTableStats() { return hasCorruptTableStats_; } } diff --git a/fe/src/main/java/com/cloudera/impala/planner/Planner.java b/fe/src/main/java/com/cloudera/impala/planner/Planner.java index ea7be015c..9a32d5791 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/Planner.java +++ b/fe/src/main/java/com/cloudera/impala/planner/Planner.java @@ -160,6 +160,22 @@ public class Planner { request.per_host_vcores)); hasHeader = true; } + + // IMPALA-1983 In the case of corrupt stats, issue a warning for all queries except + // child queries of 'compute stats'. + if (!request.query_ctx.isSetParent_query_id() && + request.query_ctx.isSetTables_with_corrupt_stats() && + !request.query_ctx.getTables_with_corrupt_stats().isEmpty()) { + List tableNames = Lists.newArrayList(); + for (TTableName tableName: request.query_ctx.getTables_with_corrupt_stats()) { + tableNames.add(tableName.db_name + "." + tableName.table_name); + } + str.append("WARNING: The following tables have potentially corrupt table\n" + + "statistics. Drop and re-compute statistics to resolve this problem.\n" + + Joiner.on(", ").join(tableNames) + "\n"); + hasHeader = true; + } + // Append warning about tables missing stats except for child queries of // 'compute stats'. The parent_query_id is only set for compute stats child queries. if (!request.query_ctx.isSetParent_query_id() && @@ -173,6 +189,7 @@ public class Planner { "and/or column statistics.\n" + Joiner.on(", ").join(tableNames) + "\n"); hasHeader = true; } + if (request.query_ctx.isDisable_spilling()) { str.append("WARNING: Spilling is disabled for this query as a safety guard.\n" + "Reason: Query option disable_unsafe_spills is set, at least one table\n" + diff --git a/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java index 80db2f082..6f3a577ce 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java @@ -124,6 +124,11 @@ abstract public class ScanNode extends PlanNode { return false; } + /** + * Returns true, if the scanned table is suspected to have corrupt table stats, + * in particular, if the scan is non-empty and 'numRows' is 0 or negative (but not -1). + */ + public boolean hasCorruptTableStats() { return false; } /** * Helper function to parse a "host:port" address string into TNetworkAddress diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java index 48155cf7f..417630ad0 100644 --- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java +++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java @@ -890,6 +890,8 @@ public class Frontend { // Also assemble list of tables names missing stats for assembling a warning message. LOG.debug("get scan range locations"); Set tablesMissingStats = Sets.newTreeSet(); + // Assemble a similar list for corrupt stats + Set tablesWithCorruptStats = Sets.newTreeSet(); for (ScanNode scanNode: scanNodes) { queryExecRequest.putToPer_node_scan_ranges( scanNode.getId().asInt(), @@ -897,11 +899,18 @@ public class Frontend { if (scanNode.isTableMissingStats()) { tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift()); } + if (scanNode.hasCorruptTableStats()) { + tablesWithCorruptStats.add(scanNode.getTupleDesc().getTableName().toThrift()); + } } + queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList()); for (TTableName tableName: tablesMissingStats) { queryCtx.addToTables_missing_stats(tableName); } + for (TTableName tableName: tablesWithCorruptStats) { + queryCtx.addToTables_with_corrupt_stats(tableName); + } // Optionally disable spilling in the backend. Allow spilling if there are plan hints // or if all tables have stats. diff --git a/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java b/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java index aaecf7dba..4c3262807 100644 --- a/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java +++ b/fe/src/main/java/com/cloudera/impala/util/MaxRowsProcessedVisitor.java @@ -34,7 +34,8 @@ public class MaxRowsProcessedVisitor implements Visitor { if (caller instanceof ScanNode) { long tmp = caller.getInputCardinality(); ScanNode scan = (ScanNode) caller; - if (scan.isTableMissingTableStats() && !scan.hasLimit()) { + if ((scan.isTableMissingTableStats() || + scan.hasCorruptTableStats()) && !scan.hasLimit()) { abort_ = true; return; } diff --git a/testdata/workloads/functional-query/queries/QueryTest/corrupt_stats.test b/testdata/workloads/functional-query/queries/QueryTest/corrupt_stats.test new file mode 100644 index 000000000..745e18c3d --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/corrupt_stats.test @@ -0,0 +1,184 @@ +==== +---- QUERY +use compute_stats_db; +==== +---- QUERY +create table corrupted (id int, name string) partitioned by (org int); +==== +---- QUERY +insert into corrupted partition (org=1) values (1, "Martin"), (2, "Hans"), (3, "Peter"); +==== +---- QUERY +insert into corrupted partition (org=2) values (4, "Martin"), (5, "Hans"), (6, "Peter"); +==== +---- QUERY +show table stats corrupted; +---- LABELS +ORG, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION +---- RESULTS +'1',-1,1,'24B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted/org=1' +'2',-1,1,'24B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted/org=2' +'Total',-1,2,'48B','0B','','','','' +---- TYPES +STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +compute stats corrupted; +==== +---- QUERY +show table stats corrupted; +---- LABELS +ORG, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION +---- RESULTS +'1',3,1,'24B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted/org=1' +'2',3,1,'24B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted/org=2' +'Total',6,2,'48B','0B','','','','' +---- TYPES +STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +alter table corrupted partition(org=1) set tblproperties('numRows'='0'); +==== +---- QUERY +invalidate metadata corrupted; +==== +---- QUERY +show table stats corrupted; +---- LABELS +ORG, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION +---- RESULTS +'1',0,1,'24B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted/org=1' +'2',3,1,'24B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted/org=2' +'Total',6,2,'48B','0B','','','','' +---- TYPES +STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +explain select count(*) from corrupted where org = 1; +---- RESULTS: VERIFY_IS_SUBSET +'WARNING: The following tables have potentially corrupt table' +'statistics. Drop and re-compute statistics to resolve this problem.' +'compute_stats_db.corrupted' +'' +'03:AGGREGATE [FINALIZE]' +'| output: count:merge(*)' +'|' +'02:EXCHANGE [UNPARTITIONED]' +'|' +'01:AGGREGATE' +'| output: count(*)' +'|' +'00:SCAN HDFS [compute_stats_db.corrupted]' +' partitions=1/2 files=1 size=24B' +---- TYPES +STRING +==== +---- QUERY +alter table corrupted partition(org=1) set tblproperties('numRows'='3'); +alter table corrupted set tblproperties('numRows'='0'); +==== +---- QUERY +show table stats corrupted; +---- LABELS +ORG, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION +---- RESULTS +'1',3,1,'24B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted/org=1' +'2',3,1,'24B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted/org=2' +'Total',0,2,'48B','0B','','','','' +---- TYPES +STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +explain select count(*) from corrupted; +---- RESULTS: VERIFY_IS_SUBSET +'01:AGGREGATE [FINALIZE]' +'| output: count(*)' +'|' +'00:SCAN HDFS [compute_stats_db.corrupted]' +' partitions=2/2 files=2 size=48B' +---- TYPES +STRING +==== +---- QUERY +alter table corrupted set tblproperties('numRows'='6'); +==== +---- QUERY +show table stats corrupted; +---- LABELS +ORG, #ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION +---- RESULTS +'1',3,1,'24B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted/org=1' +'2',3,1,'24B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted/org=2' +'Total',6,2,'48B','0B','','','','' +---- TYPES +STRING, BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +explain select count(*) from corrupted; +---- RESULTS: VERIFY_IS_SUBSET +'01:AGGREGATE [FINALIZE]' +'| output: count(*)' +'|' +'00:SCAN HDFS [compute_stats_db.corrupted]' +' partitions=2/2 files=2 size=48B' +---- TYPES +STRING +==== +---- QUERY +create table corrupted_no_part (id int); +insert into corrupted_no_part values (1),(2),(3); +compute stats corrupted_no_part; +==== +---- QUERY +show table stats corrupted_no_part; +---- LABELS +#ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION +---- RESULTS +3,1,'6B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted_no_part' +---- TYPES +BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +-- Check that small query optimization is executed. +explain select count(*) from corrupted_no_part; +---- RESULTS: VERIFY_IS_SUBSET +'01:AGGREGATE [FINALIZE]' +'| output: count(*)' +'|' +'00:SCAN HDFS [compute_stats_db.corrupted_no_part]' +' partitions=1/1 files=1 size=6B' +---- TYPES +STRING +==== +---- QUERY +alter table corrupted_no_part set tblproperties('numRows'='0'); +==== +---- QUERY +show table stats corrupted_no_part; +---- LABELS +#ROWS, #FILES, SIZE, BYTES CACHED, CACHE REPLICATION, FORMAT, INCREMENTAL STATS, LOCATION +---- RESULTS +-1,1,'6B','NOT CACHED','NOT CACHED','TEXT','false','hdfs://localhost:20500/test-warehouse/compute_stats_db.db/corrupted_no_part' +---- TYPES +BIGINT, BIGINT, STRING, STRING, STRING, STRING, STRING, STRING +==== +---- QUERY +-- After setting num rows to 0, the HMS will set it to -1 and avoids bad behavior. +explain select count(*) from corrupted_no_part; +---- RESULTS: VERIFY_IS_SUBSET +'WARNING: The following tables are missing relevant table and/or column statistics.' +'compute_stats_db.corrupted_no_part' +'' +'03:AGGREGATE [FINALIZE]' +'| output: count:merge(*)' +'|' +'02:EXCHANGE [UNPARTITIONED]' +'|' +'01:AGGREGATE' +'| output: count(*)' +'|' +'00:SCAN HDFS [compute_stats_db.corrupted_no_part]' +' partitions=1/1 files=1 size=6B' +---- TYPES +STRING +==== \ No newline at end of file diff --git a/tests/metadata/test_compute_stats.py b/tests/metadata/test_compute_stats.py index bee770950..c5e266c93 100644 --- a/tests/metadata/test_compute_stats.py +++ b/tests/metadata/test_compute_stats.py @@ -115,4 +115,25 @@ class TestComputeStats(ImpalaTestSuite): show_result = \ self.execute_query("show table stats %s.%s" % (self.TEST_DB_NAME, table_name)) assert(len(show_result.data) == 2) - assert("1\tpval\t8" in show_result.data[0]) \ No newline at end of file + assert("1\tpval\t8" in show_result.data[0]) + + +@SkipIfS3.insert # S3: missing coverage: compute stats +@SkipIf.not_default_fs # Isilon: Missing coverage: compute stats +class TestCorruptTableStats(TestComputeStats): + + @classmethod + def add_test_dimensions(cls): + super(TestComputeStats, cls).add_test_dimensions() + cls.TestMatrix.add_dimension(create_exec_option_dimension( + disable_codegen_options=[False], exec_single_node_option=[100])) + # Do not run these tests using all dimensions because the expected results + # are different for different file formats. + cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload())) + + @pytest.mark.execute_serially + def test_corrupted_stats(self, vector): + """IMPALA-1983: Test that in the presence of corrupt table statistics a warning is + issued and the small query optimization is disabled.""" + if self.exploration_strategy() != 'exhaustive': pytest.skip("Only run in exhaustive") + self.run_test_case('QueryTest/corrupt_stats', vector)