diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index ac3d7bcc7..550142d6f 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -285,7 +285,7 @@ class ExecNode { int id() const { return id_; } TPlanNodeType::type type() const { return type_; } - const PlanNode& plan_node() { return plan_node_; } + const PlanNode& plan_node() const { return plan_node_; } /// Returns a unique label for this ExecNode of the form "PLAN_NODE_TYPE(id=[int])", /// for example, EXCHANGE_NODE (id=2). diff --git a/be/src/exec/tuple-cache-node.cc b/be/src/exec/tuple-cache-node.cc index 57bc17f8e..455245f6b 100644 --- a/be/src/exec/tuple-cache-node.cc +++ b/be/src/exec/tuple-cache-node.cc @@ -18,6 +18,7 @@ #include #include "exec/exec-node-util.h" +#include "exec/hdfs-scan-node-base.h" #include "exec/tuple-cache-node.h" #include "exec/tuple-file-reader.h" #include "exec/tuple-file-writer.h" @@ -432,37 +433,102 @@ void TupleCacheNode::DebugString(int indentation_level, stringstream* out) const *out << ")"; } +// This hashes all the fields of the HdfsPartitionDescriptor, except: +// 1. block_size: Only used for writing, so it doesn't matter for reading +// 2. location: The location is hashed as part of the scan range +// 3. id: The partition id is not stable over time +// As a substitute for the "id", this uses the partition key expr hash, which +// is a stable identifier for the partition. +uint32_t TupleCacheNode::HashHdfsPartitionDescriptor( + const HdfsPartitionDescriptor* partition_desc, uint32_t seed) { + uint32_t hash = seed; + char line_delim = partition_desc->line_delim(); + hash = HashUtil::Hash(&line_delim, sizeof(line_delim), hash); + char field_delim = partition_desc->field_delim(); + hash = HashUtil::Hash(&field_delim, sizeof(field_delim), hash); + char collection_delim = partition_desc->collection_delim(); + hash = HashUtil::Hash(&collection_delim, sizeof(collection_delim), hash); + char escape_char = partition_desc->escape_char(); + hash = HashUtil::Hash(&escape_char, sizeof(escape_char), hash); + std::string file_format = to_string(partition_desc->file_format()); + hash = HashUtil::Hash(file_format.data(), file_format.length(), hash); + hash = HashUtil::Hash(partition_desc->encoding_value().data(), + partition_desc->encoding_value().length(), hash); + std::string json_binary_format = to_string(partition_desc->json_binary_format()); + hash = HashUtil::Hash(json_binary_format.data(), json_binary_format.length(), hash); + uint32_t partition_key_expr_hash = partition_desc->partition_key_expr_hash(); + hash = HashUtil::Hash(&partition_key_expr_hash, sizeof(partition_key_expr_hash), hash); + return hash; +} + +uint32_t TupleCacheNode::HashHdfsFileSplit(const HdfsFileSplitPB& split, uint32_t seed) { + uint32_t hash = seed; + if (split.has_relative_path() && !split.relative_path().empty()) { + hash = HashUtil::Hash( + split.relative_path().data(), split.relative_path().length(), hash); + DCHECK(split.has_partition_path_hash()); + int32_t partition_path_hash = split.partition_path_hash(); + hash = HashUtil::Hash(&partition_path_hash, sizeof(partition_path_hash), hash); + } else if (split.has_absolute_path() && !split.absolute_path().empty()) { + hash = HashUtil::Hash( + split.absolute_path().data(), split.absolute_path().length(), hash); + } else { + DCHECK("Either relative_path or absolute_path must be set"); + } + DCHECK(split.has_offset()); + int64_t offset = split.offset(); + hash = HashUtil::Hash(&offset, sizeof(offset), hash); + DCHECK(split.has_length()); + int64_t length = split.length(); + hash = HashUtil::Hash(&length, sizeof(length), hash); + DCHECK(split.has_mtime()); + int64_t mtime = split.mtime(); + hash = HashUtil::Hash(&mtime, sizeof(mtime), hash); + return hash; +} + void TupleCacheNode::ComputeFragmentInstanceKey(const RuntimeState* state) { const PlanFragmentInstanceCtxPB& ctx = state->instance_ctx_pb(); uint32_t hash = 0; + // Collect the HdfsScanNodes below this point. The HdfsScanNodes have information about + // the partitions that we need to include in the fragment instance key. Some locations + // may have a large number of scan nodes below them, so construct a map from the node + // id to the HdfsScanNodeBase. + vector scan_nodes; + CollectNodes(TPlanNodeType::HDFS_SCAN_NODE, &scan_nodes); + unordered_map id_to_scan_node_map; + for (const ExecNode* exec_node : scan_nodes) { + const HdfsScanNodeBase* scan_node = + static_cast(exec_node); + int node_id = exec_node->plan_node().tnode_->node_id; + DCHECK(id_to_scan_node_map.find(node_id) == id_to_scan_node_map.end()) + << "Duplicate scan node id: " << node_id; + id_to_scan_node_map[node_id] = scan_node; + } for (int32_t node_id : plan_node().tnode_->tuple_cache_node.input_scan_node_ids) { + const HdfsTableDescriptor* hdfs_table = id_to_scan_node_map[node_id]->hdfs_table(); + DCHECK(hdfs_table != nullptr); auto ranges = ctx.per_node_scan_ranges().find(node_id); if (ranges == ctx.per_node_scan_ranges().end()) continue; for (const ScanRangeParamsPB& params : ranges->second.scan_ranges()) { // This only supports HDFS right now DCHECK(params.scan_range().has_hdfs_file_split()); const HdfsFileSplitPB& split = params.scan_range().hdfs_file_split(); - if (split.has_relative_path() && !split.relative_path().empty()) { - hash = HashUtil::Hash( - split.relative_path().data(), split.relative_path().length(), hash); - DCHECK(split.has_partition_path_hash()); - int32_t partition_path_hash = split.partition_path_hash(); - hash = HashUtil::Hash(&partition_path_hash, sizeof(partition_path_hash), hash); - } else if (split.has_absolute_path() && !split.absolute_path().empty()) { - hash = HashUtil::Hash( - split.absolute_path().data(), split.absolute_path().length(), hash); + // Information on the partition can influence how files are processed. For example, + // for text files, the delimiter can be specified on the partition level. There + // are several such attributes. We need to incorporate the partition information + // into the hash for each file split. + const HdfsPartitionDescriptor* partition_desc = + hdfs_table->GetPartition(split.partition_id()); + DCHECK(partition_desc != nullptr); + if (partition_desc != nullptr) { + hash = HashHdfsPartitionDescriptor(partition_desc, hash); } else { - DCHECK("Either relative_path or absolute_path must be set"); + LOG(WARNING) << "Partition id " << split.partition_id() + << " not found in table " << hdfs_table->fully_qualified_name() + << " split filename: " << split.relative_path(); } - DCHECK(split.has_offset()); - int64_t offset = split.offset(); - hash = HashUtil::Hash(&offset, sizeof(offset), hash); - DCHECK(split.has_length()); - int64_t length = split.length(); - hash = HashUtil::Hash(&length, sizeof(length), hash); - DCHECK(split.has_mtime()); - int64_t mtime = split.mtime(); - hash = HashUtil::Hash(&mtime, sizeof(mtime), hash); + hash = HashHdfsFileSplit(split, hash); } } fragment_instance_key_ = hash; diff --git a/be/src/exec/tuple-cache-node.h b/be/src/exec/tuple-cache-node.h index ef2e2dd6b..51a90bc0c 100644 --- a/be/src/exec/tuple-cache-node.h +++ b/be/src/exec/tuple-cache-node.h @@ -27,6 +27,8 @@ namespace impala { class TupleFileReader; class TupleFileWriter; class TupleTextFileWriter; +class HdfsFileSplitPB; +class HdfsPartitionDescriptor; class TupleCachePlanNode : public PlanNode { public: @@ -85,8 +87,16 @@ private: void ReleaseResult(); + // Hash the relevant attributes from an HdfsPartitionDescriptor using the specified + // seed. + uint32_t HashHdfsPartitionDescriptor(const HdfsPartitionDescriptor* partition_desc, + uint32_t seed); + + // Hash the relevant attributes from an HdfsFileSplit using the specified seed. + uint32_t HashHdfsFileSplit(const HdfsFileSplitPB& split, uint32_t seed); + // Construct the fragment instance part of the cache key by hashing information about - // inputs to this fragment (e.g. scan ranges). + // inputs to this fragment (e.g. scan ranges and partition settings). void ComputeFragmentInstanceKey(const RuntimeState *state); /// Reader/Writer for caching diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index d4bc2f62d..5af38fd43 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -35,6 +35,7 @@ #include "gen-cpp/PlanNodes_types.h" #include "rpc/thrift-util.h" #include "runtime/runtime-state.h" +#include "util/hash-util.h" #include "common/names.h" @@ -208,6 +209,15 @@ string TableDescriptor::DebugString() const { return out.str(); } +static uint32_t HashPartitionKeyExprs(const vector& partition_key_exprs) { + stringstream stream; + for (const TExpr& texpr : partition_key_exprs) { + stream << texpr; + } + string s = stream.str(); + return HashUtil::Hash(s.data(), s.length(), 0); +} + HdfsPartitionDescriptor::HdfsPartitionDescriptor( const THdfsTable& thrift_table, const THdfsPartition& thrift_partition) : id_(thrift_partition.id), @@ -222,6 +232,7 @@ HdfsPartitionDescriptor::HdfsPartitionDescriptor( json_binary_format_ = sd.jsonBinaryFormat; encoding_value_ = sd.__isset.encodingValue ? sd.encodingValue : ""; DecompressLocation(thrift_table, thrift_partition, &location_); + partition_key_expr_hash_ = HashPartitionKeyExprs(thrift_partition_key_exprs_); } string HdfsPartitionDescriptor::DebugString() const { diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 8b0457286..bf18c249e 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -403,6 +403,7 @@ class HdfsPartitionDescriptor { const std::string& location() const { return location_; } int64_t id() const { return id_; } TJsonBinaryFormat::type json_binary_format() const { return json_binary_format_; } + uint32_t partition_key_expr_hash() const { return partition_key_expr_hash_; } std::string DebugString() const; /// It is safe to call the returned expr evaluators concurrently from multiple @@ -425,12 +426,15 @@ class HdfsPartitionDescriptor { // stripped. std::string location_; int64_t id_; + uint32_t partition_key_expr_hash_; /// List of literal (and therefore constant) expressions for each partition key. Their /// order corresponds to the first num_clustering_cols of the parent table. /// The Prepare()/Open()/Close() cycle is controlled by the containing descriptor table /// because the same partition descriptor may be used by multiple exec nodes with /// different lifetimes. + /// WARNING: This is only valid for a brief time and is left dangling. It can't be + /// exposed publicly. const std::vector& thrift_partition_key_exprs_; /// These evaluators are safe to be shared by all fragment instances as all expressions diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index ed47bd1f6..bdc3565b3 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -84,6 +84,7 @@ import org.apache.impala.thrift.TFileSplitGeneratorSpec; import org.apache.impala.thrift.TJsonBinaryFormat; import org.apache.impala.thrift.THdfsFileSplit; import org.apache.impala.thrift.THdfsScanNode; +import org.apache.impala.thrift.THdfsStorageDescriptor; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TOverlapPredicateDesc; import org.apache.impala.thrift.TPlanNode; @@ -2828,4 +2829,78 @@ public class HdfsScanNode extends ScanNode { } return ndvMult; } + + // Incorporate the details from this scan node into the supplied TupleCacheInfo. This + // is used when an entire scan node needs to be incorporated into a tuple cache key + // (e.g. for the build side of a join). This iterates over the partitions, hashing + // the partition information (storage descriptor and partition keys/values) as well as + // the associated scan ranges. + public void incorporateScansIntoTupleCache(TupleCacheInfo info) { + TScanRangeSpec orig = getScanRangeSpecs(); + + // Sort the partitions so it is consistent over time (the partition name is built + // from the partition column values). + List sortedPartitions = new ArrayList<>(getSampledOrRawPartitions()); + sortedPartitions.sort( + (p1, p2) -> p1.getPartitionName().compareTo(p2.getPartitionName())); + for (FeFsPartition partition : sortedPartitions) { + TScanRangeSpec spec = new TScanRangeSpec(); + boolean hasScanRange = false; + if (orig.isSetConcrete_ranges()) { + for (TScanRangeLocationList origLocList: orig.concrete_ranges) { + if (origLocList.scan_range.hdfs_file_split.partition_id != partition.getId()) { + continue; + } + hasScanRange = true; + // We only need the TScanRange, which provides the file segment info. + TScanRangeLocationList locList = new TScanRangeLocationList(); + TScanRange scanRange = origLocList.scan_range.deepCopy(); + if (scanRange.isSetHdfs_file_split()) { + // Zero out partition_id, it's not stable. + scanRange.hdfs_file_split.partition_id = 0; + } + locList.setScan_range(scanRange); + spec.addToConcrete_ranges(locList); + } + if (hasScanRange) { + // Reloaded partitions may have a different order. Sort for stability. + spec.concrete_ranges.sort(null); + } + } + if (orig.isSetSplit_specs()) { + for (TFileSplitGeneratorSpec origSplitSpec: orig.split_specs) { + if (origSplitSpec.partition_id != partition.getId()) continue; + hasScanRange = true; + TFileSplitGeneratorSpec splitSpec = origSplitSpec.deepCopy(); + // Zero out partition_id, it's not stable. + splitSpec.partition_id = 0; + spec.addToSplit_specs(splitSpec); + } + // Reloaded partitions may have a different order. Sort for stability. + spec.split_specs.sort(null); + } + + // We should ignore empty partitions, so only include the information if there is + // at least one scan range. + if (hasScanRange) { + // Incorporate the storage descriptor. This contains several fields that can + // impact correctness, including the escape character, separator character, + // json binary format, etc. + THdfsStorageDescriptor inputFormat = + partition.getInputFormatDescriptor().toThrift(); + // Zero the block size, as it is not relevant to reads. + inputFormat.setBlockSize(0); + info.hashThrift(inputFormat); + + // Hash the partition name (which includes the partition keys and values) + // This is necessary for cases where two partitions point to the same + // directory and files. Without knowing the partition keys/values, the + // cache can't tell them apart. + info.hashString(partition.getPartitionName()); + + // Hash the scan range information + info.hashThrift(spec); + } + } + } } diff --git a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java index f1220b40d..83a1835c2 100644 --- a/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java +++ b/fe/src/main/java/org/apache/impala/planner/TupleCacheInfo.java @@ -248,34 +248,7 @@ public class TupleCacheInfo { // filename, mtime, size, and offset. Others like partition_id may change after // reloading metadata. for (HdfsScanNode scanNode: inputScanNodes_) { - TScanRangeSpec orig = scanNode.getScanRangeSpecs(); - TScanRangeSpec spec = new TScanRangeSpec(); - if (orig.isSetConcrete_ranges()) { - for (TScanRangeLocationList origLocList: orig.concrete_ranges) { - // We only need the TScanRange, which provides the file segment info. - TScanRangeLocationList locList = new TScanRangeLocationList(); - TScanRange scanRange = origLocList.scan_range.deepCopy(); - if (scanRange.isSetHdfs_file_split()) { - // Zero out partition_id, it's not stable. - scanRange.hdfs_file_split.partition_id = 0; - } - locList.setScan_range(scanRange); - spec.addToConcrete_ranges(locList); - } - // Reloaded partitions may have a different order. Sort for stability. - spec.concrete_ranges.sort(null); - } - if (orig.isSetSplit_specs()) { - for (TFileSplitGeneratorSpec origSplitSpec: orig.split_specs) { - TFileSplitGeneratorSpec splitSpec = origSplitSpec.deepCopy(); - // Zero out partition_id, it's not stable. - splitSpec.partition_id = 0; - spec.addToSplit_specs(splitSpec); - } - // Reloaded partitions may have a different order. Sort for stability. - spec.split_specs.sort(null); - } - hashThrift(spec); + scanNode.incorporateScansIntoTupleCache(this); } // The scan ranges have been incorporated into the key and are no longer needed // at runtime. @@ -341,6 +314,17 @@ public class TupleCacheInfo { hashTraceBuilder_.append(thriftString); } + /** + * Hash a regular string and incorporate it into the key + */ + public void hashString(String s) { + Preconditions.checkState(!finalized_, + "TupleCacheInfo is finalized and can't be modified"); + Preconditions.checkState(s != null); + hasher_.putUnencodedChars(s); + hashTraceBuilder_.append(s); + } + /** * registerTuple() does two things: * 1. It incorporates a tuple's layout (and slot information) into the cache key. diff --git a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java index 34221fa6e..a854d7542 100644 --- a/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java +++ b/fe/src/test/java/org/apache/impala/planner/TupleCacheTest.java @@ -436,6 +436,18 @@ public class TupleCacheTest extends PlannerTestBase { verifyJoinNodesEligible( "select * from functional_parquet.iceberg_v2_positional_delete_all_rows", 1, /* isDistributedPlan */ true); + + // When incorporating the scan range information from the build side of the + // join, we need to also incorporate information about the partitions involved. + // scale_db.num_partitions_1234_blocks_per_partition_1 is an exotic table where + // all the partitions point to the same file. If we don't incorporate partition + // information, then it can't tell apart queries against different partitions. + String incorporatePartitionSqlTemplate = + "select straight_join build.j, probe.id from functional.alltypes probe, " + + "scale_db.num_partitions_1234_blocks_per_partition_1 build " + + "where probe.id = build.i and build.j = %s"; + verifyOverlappingCacheKeys(String.format(incorporatePartitionSqlTemplate, 1), + String.format(incorporatePartitionSqlTemplate, 2)); } @Test diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 3e6ce0c8a..9e7414716 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -1320,12 +1320,17 @@ class ImpalaTestSuite(BaseTestSuite): assert abs(a - b) / float(max(abs(a), abs(b))) <= diff_perc def _get_table_location(self, table_name, vector): - """ Returns the HDFS location of the table. - This method changes self.client to point to the dabatase described by 'vector'.""" - db_name = self.get_db_name_from_format(vector.get_table_format()) - self.__change_client_database(self.client, db_name=db_name) + """ Returns the HDFS location of the table. If the table is not fully qualified, + this uses the database from the vector.""" + is_fully_qualified = table_name.find(".") != -1 + if is_fully_qualified: + fq_table_name = table_name + else: + db_name = self.get_db_name_from_format(vector.get_table_format()) + fq_table_name = "{0}.{1}".format(db_name, table_name) + result = self.execute_query_using_client(self.client, - "describe formatted %s" % table_name, vector) + "describe formatted %s" % fq_table_name, vector) for row in result.data: if 'Location:' in row: return row.split('\t')[1] diff --git a/tests/custom_cluster/test_tuple_cache.py b/tests/custom_cluster/test_tuple_cache.py index 4b5b13724..4df907b97 100644 --- a/tests/custom_cluster/test_tuple_cache.py +++ b/tests/custom_cluster/test_tuple_cache.py @@ -404,6 +404,42 @@ class TestTupleCacheSingle(TestTupleCacheBase): self.run_test_case('QueryTest/parquet-resolution-by-name', vector, use_db=unique_database) + def test_partition_information(self, vector): + """Verify that partition information is incorporated into the runtime cache key""" + self.client.set_configuration(vector.get_value('exec_option')) + + # scale_db.num_partitions_1234_blocks_per_partition_1 is an exotic table where all + # the partitions point to the same filesystem location. A single file is read many + # times for different partitions. It is not possible to tell the partitions apart + # by the file path, so this verifies that the partition information is being included + # properly. + query_template = \ + "select i, j from scale_db.num_partitions_1234_blocks_per_partition_1 where j={0}" + # Run against the j=1 partition + result1 = self.execute_query(query_template.format(1)) + assert result1.success + assertCounters(result1.runtime_profile, 0, 0, 0) + assert len(result1.data) == 1 + assert result1.data[0].split("\t") == ["1", "1"] + + # Run against the j=2 partition. There should not be a cache hit, because they are + # running against different partitions. This only works if the runtime key + # incorporates the partition information. + result2 = self.execute_query(query_template.format(2)) + assert result2.success + assertCounters(result2.runtime_profile, 0, 0, 0) + assert len(result2.data) == 1 + assert result2.data[0].split("\t") == ["1", "2"] + + def test_json_binary_format(self, vector, unique_database): + """This is identical to test_scanners.py's TestBinaryType::test_json_binary_format. + That test modifies a table's serde properties to change the json binary format. + The tuple cache detects that by including the partition's storage descriptor + information. This fails if that doesn't happen.""" + test_tbl = unique_database + '.binary_tbl' + self.clone_table('functional_json.binary_tbl', test_tbl, False, vector) + self.run_test_case('QueryTest/json-binary-format', vector, unique_database) + @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS) class TestTupleCacheCluster(TestTupleCacheBase): diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py index 8b10a4d5b..5c0f0d433 100644 --- a/tests/query_test/test_scanners_fuzz.py +++ b/tests/query_test/test_scanners_fuzz.py @@ -108,18 +108,22 @@ class TestScannersFuzzing(ImpalaTestSuite): src_db = QueryTestSectionReader.get_db_name(table_format) if table_format.file_format not in ['parquet', 'orc']: pytest.skip() + # For historical reasons, this test ran against the wrong database and was + # running against the uncorrupted table. Now that this is corrected, it frequently + # crashes Impala. Until that is fixed in IMPALA-14219, we need to xfail this test. + pytest.xfail("IMPALA-14219: this test can crash Impala") # Additional queries to scan the nested values. custom_queries = [ "select count(*) from (" " select distinct t.id, a.pos as apos, a.item as aitem, aa.pos, aa.item, " " m.key as mkey, m.value as mvalue, ma.key, ma.value, t.nested_struct.* " - " from complextypestbl t, t.int_array a, t.int_array_array.item aa, " + " from {db}.{table} t, t.int_array a, t.int_array_array.item aa, " " t.int_map m, t.int_map_array.item ma) q", "select count(*) from (" " select t.id, t.nested_struct.a, b.pos as bpos, b.item as bitem, i.e, i.f, m.key," " arr.pos, arr.item " - " from complextypestbl t, t.nested_struct.b, t.nested_struct.c.d.item i," + " from {db}.{table} t, t.nested_struct.b, t.nested_struct.c.d.item i," " t.nested_struct.g m, m.value.h.i arr) q", ] self.run_fuzz_test(vector, src_db, table_name, unique_database, table_name, 10, @@ -158,12 +162,16 @@ class TestScannersFuzzing(ImpalaTestSuite): def test_fuzz_parquet_v2(self, vector, unique_database): table_format = vector.get_value('table_format') if table_format.file_format != 'parquet': pytest.skip() + # For historical reasons, this test ran against the wrong database and was + # running against the uncorrupted table. Now that this is corrected, it frequently + # crashes Impala. Until that is fixed in IMPALA-14219, we need to xfail this test. + pytest.xfail("IMPALA-14219: this test can crash Impala") tables = ["alltypesagg_parquet_v2_uncompressed", "alltypesagg_parquet_v2_snappy"] for table_name in tables: custom_queries = [ "select avg(float_col), avg(double_col), avg(timestamp_col)" - " from %s where bool_col;" % table_name + " from {db}.{table} where bool_col;" ] self.run_fuzz_test(vector, "functional_parquet", table_name, unique_database, table_name, 10, custom_queries) @@ -172,7 +180,7 @@ class TestScannersFuzzing(ImpalaTestSuite): "complextypestbl_parquet_v2_snappy"] for table_name in tables: custom_queries = [ - "select int_array from %s;" % table_name + "select int_array from {db}.{table};" ] self.run_fuzz_test(vector, "functional_parquet", table_name, unique_database, table_name, 10, custom_queries) @@ -188,6 +196,8 @@ class TestScannersFuzzing(ImpalaTestSuite): SCANNER_FUZZ_SEED can be set in the environment to reproduce the result (assuming that input files are the same). SCANNER_FUZZ_KEEP_FILES can be set in the environment to keep the generated files. + custom_queries can specify additional queries to run. References to '{db}' and + '{table}' in the custom queries are replaced with the fuzz db and table. """ # Create and seed a new random number generator for reproducibility. rng = random.Random() @@ -246,11 +256,11 @@ class TestScannersFuzzing(ImpalaTestSuite): # Also execute a count(*) that materializes no columns, since different code # paths are exercised. queries = [ - 'select count(*) from (select distinct * from {0}.{1}) q'.format( - fuzz_db, fuzz_table), - 'select count(*) from {0}.{1} q'.format(fuzz_db, fuzz_table)] + 'select count(*) from (select distinct * from {db}.{table}) q'.format( + db=fuzz_db, table=fuzz_table), + 'select count(*) from {db}.{table} q'.format(db=fuzz_db, table=fuzz_table)] if custom_queries is not None: - queries = queries + [s.format(fuzz_db, fuzz_table) for s in custom_queries] + queries = queries + [s.format(db=fuzz_db, table=fuzz_table) for s in custom_queries] for query, batch_size, disable_codegen in \ itertools.product(queries, self.BATCH_SIZES, self.DISABLE_CODEGEN_VALUES):