From 9d46853fbcc0fa86ffdee3a5027fa60b8207808c Mon Sep 17 00:00:00 2001 From: Alex Behm Date: Wed, 26 Aug 2015 18:09:20 -0700 Subject: [PATCH] Nested Types: Check un/supported file formats for complex types. Before this patch, we used to accept any query referencing complex types, regardless of the table/partition's file format being scanned. We would ultimately hit a DCHECK in the BE when attempting to scan complex types of a table/partition with an unsupported format. This patch makes queries fail gracefully during planning if a scan would access a table/partition in a format for which we do not support complex types. For mixed-format partitioned Hdfs tables we perform this check at the partition granularity, so such a table can be scanned as long as only partitions with supported formats are accessed. HBase tables with complex-typed columns can be scanned as long as no complex-typed columns are accessed in the query. Change-Id: I2fd2e386c9755faf2cfe326541698a7094fa0ffc Reviewed-on: http://gerrit.cloudera.org:8080/705 Reviewed-by: Alex Behm Tested-by: Internal Jenkins --- .../impala/catalog/HdfsFileFormat.java | 44 +++++- .../impala/planner/DataSourceScanNode.java | 8 +- .../impala/planner/DistributedPlanner.java | 23 ++- .../impala/planner/HBaseScanNode.java | 4 +- .../cloudera/impala/planner/HashJoinNode.java | 3 +- .../cloudera/impala/planner/HdfsScanNode.java | 48 ++++++- .../com/cloudera/impala/planner/JoinNode.java | 4 +- .../impala/planner/NestedLoopJoinNode.java | 4 +- .../com/cloudera/impala/planner/PlanNode.java | 4 +- .../com/cloudera/impala/planner/ScanNode.java | 27 ++++ .../impala/planner/SingleNodePlanner.java | 2 +- .../impala/planner/SingularRowSrcNode.java | 4 +- .../cloudera/impala/planner/UnnestNode.java | 4 +- .../cloudera/impala/planner/PlannerTest.java | 5 + .../impala/planner/PlannerTestBase.java | 2 +- testdata/bin/generate-schema-statements.py | 14 -- .../functional/functional_schema_template.sql | 49 +++++++ .../functional/schema_constraints.csv | 7 + .../complex-types-file-formats.test | 131 ++++++++++++++++++ 19 files changed, 336 insertions(+), 51 deletions(-) create mode 100644 testdata/workloads/functional-planner/queries/PlannerTest/complex-types-file-formats.test diff --git a/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java b/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java index 34412bee0..e1f4d4962 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/HdfsFileFormat.java @@ -14,48 +14,61 @@ package com.cloudera.impala.catalog; +import java.util.List; import java.util.Map; import com.cloudera.impala.thrift.THdfsFileFormat; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; /** * Supported HDFS file formats. Every file format specifies: * 1) the input format class * 2) the output format class * 3) the serialization library class + * 4) whether scanning complex types from it is supported * * Important note: Always keep consistent with the classes used in Hive. */ public enum HdfsFileFormat { RC_FILE("org.apache.hadoop.hive.ql.io.RCFileInputFormat", "org.apache.hadoop.hive.ql.io.RCFileOutputFormat", - "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"), + "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe", + false), TEXT("org.apache.hadoop.mapred.TextInputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"), + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + false), LZO_TEXT("com.hadoop.mapred.DeprecatedLzoTextInputFormat", "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", - ""), + "", + false), SEQUENCE_FILE("org.apache.hadoop.mapred.SequenceFileInputFormat", "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat", - "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"), + "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", false), AVRO("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat", "org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat", - "org.apache.hadoop.hive.serde2.avro.AvroSerDe"), + "org.apache.hadoop.hive.serde2.avro.AvroSerDe", + true), PARQUET("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", - "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"); + "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + true); private final String inputFormat_; private final String outputFormat_; private final String serializationLib_; - HdfsFileFormat(String inputFormat, String outputFormat, String serializationLib) { + // Indicates whether we support scanning complex types for this file format. + private final boolean isComplexTypesSupported_; + + HdfsFileFormat(String inputFormat, String outputFormat, String serializationLib, + boolean isComplexTypesSupported) { inputFormat_ = inputFormat; outputFormat_ = outputFormat; serializationLib_ = serializationLib; + isComplexTypesSupported_ = isComplexTypesSupported; } public String inputFormat() { return inputFormat_; } @@ -214,4 +227,21 @@ public enum HdfsFileFormat { + this + " - should never happen!"); } } + + /** + * Returns true if Impala supports scanning complex-typed columns + * from a table/partition with this file format. + */ + public boolean isComplexTypesSupported() { return isComplexTypesSupported_; } + + /** + * Returns a list with all formats for which isComplexTypesSupported() is true. + */ + public static List complexTypesFormats() { + List result = Lists.newArrayList(); + for (HdfsFileFormat f: values()) { + if (f.isComplexTypesSupported()) result.add(f); + } + return result; + } } diff --git a/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java index e37c13aa5..bfe40d7ab 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/DataSourceScanNode.java @@ -31,6 +31,7 @@ import com.cloudera.impala.analysis.StringLiteral; import com.cloudera.impala.analysis.TupleDescriptor; import com.cloudera.impala.catalog.DataSource; import com.cloudera.impala.catalog.DataSourceTable; +import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.common.InternalException; import com.cloudera.impala.extdatasource.ExternalDataSourceExecutor; import com.cloudera.impala.extdatasource.thrift.TBinaryPredicate; @@ -88,7 +89,8 @@ public class DataSourceScanNode extends ScanNode { } @Override - public void init(Analyzer analyzer) throws InternalException { + public void init(Analyzer analyzer) throws ImpalaException { + checkForSupportedFileFormats(); assignConjuncts(analyzer); analyzer.createEquivConjuncts(tupleIds_.get(0), conjuncts_); prepareDataSource(); @@ -223,12 +225,12 @@ public class DataSourceScanNode extends ScanNode { TComparisonOp op = null; if ((conjunct.getChild(0).unwrapSlotRef(true) instanceof SlotRef) && (conjunct.getChild(1) instanceof LiteralExpr)) { - slotRef = (SlotRef) conjunct.getChild(0).unwrapSlotRef(true); + slotRef = conjunct.getChild(0).unwrapSlotRef(true); literalExpr = (LiteralExpr) conjunct.getChild(1); op = ((BinaryPredicate) conjunct).getOp().getThriftOp(); } else if ((conjunct.getChild(1).unwrapSlotRef(true) instanceof SlotRef) && (conjunct.getChild(0) instanceof LiteralExpr)) { - slotRef = (SlotRef) conjunct.getChild(1).unwrapSlotRef(true); + slotRef = conjunct.getChild(1).unwrapSlotRef(true); literalExpr = (LiteralExpr) conjunct.getChild(0); op = ((BinaryPredicate) conjunct).getOp().converse().getThriftOp(); } else { diff --git a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java index a1e98988c..4f680d02c 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java +++ b/fe/src/main/java/com/cloudera/impala/planner/DistributedPlanner.java @@ -29,7 +29,6 @@ import com.cloudera.impala.analysis.JoinOperator; import com.cloudera.impala.analysis.QueryStmt; import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.common.InternalException; -import com.cloudera.impala.common.NotImplementedException; import com.cloudera.impala.planner.JoinNode.DistributionMode; import com.cloudera.impala.thrift.TPartitionType; import com.google.common.base.Preconditions; @@ -94,7 +93,7 @@ public class DistributedPlanner { private PlanFragment createPlanFragments( PlanNode root, boolean isPartitioned, long perNodeMemLimit, ArrayList fragments) - throws InternalException, NotImplementedException { + throws ImpalaException { ArrayList childFragments = Lists.newArrayList(); for (PlanNode child: root.getChildren()) { // allow child fragments to be partitioned, unless they contain a limit clause @@ -187,7 +186,7 @@ public class DistributedPlanner { public PlanFragment createInsertFragment( PlanFragment inputFragment, InsertStmt insertStmt, Analyzer analyzer, ArrayList fragments) - throws InternalException { + throws ImpalaException { List partitionExprs = insertStmt.getPartitionKeyExprs(); Boolean partitionHint = insertStmt.isRepartition(); if (partitionExprs.isEmpty()) return inputFragment; @@ -249,7 +248,7 @@ public class DistributedPlanner { * Requires that input fragment be partitioned. */ private PlanFragment createMergeFragment(PlanFragment inputFragment) - throws InternalException { + throws ImpalaException { Preconditions.checkState(inputFragment.isPartitioned()); ExchangeNode mergePlan = new ExchangeNode(ctx_.getNextNodeId()); mergePlan.addChild(inputFragment.getPlanRoot(), false); @@ -290,7 +289,7 @@ public class DistributedPlanner { private PlanFragment createNestedLoopJoinFragment(NestedLoopJoinNode node, PlanFragment rightChildFragment, PlanFragment leftChildFragment, long perNodeMemLimit, ArrayList fragments) - throws InternalException { + throws ImpalaException { node.setDistributionMode(DistributionMode.BROADCAST); node.setChild(0, leftChildFragment.getPlanRoot()); connectChildFragment(node, 1, rightChildFragment); @@ -313,7 +312,7 @@ public class DistributedPlanner { HashJoinNode node, PlanFragment rightChildFragment, PlanFragment leftChildFragment, long perNodeMemLimit, ArrayList fragments) - throws InternalException { + throws ImpalaException { // broadcast: send the rightChildFragment's output to each node executing // the leftChildFragment; the cost across all nodes is proportional to the // total amount of data sent @@ -583,7 +582,7 @@ public class DistributedPlanner { */ private PlanFragment createUnionNodeFragment(UnionNode unionNode, ArrayList childFragments, ArrayList fragments) - throws InternalException { + throws ImpalaException { Preconditions.checkState(unionNode.getChildren().size() == childFragments.size()); // A UnionNode could have no children or constant selects if all of its operands @@ -656,7 +655,7 @@ public class DistributedPlanner { * input from childFragment. */ private void connectChildFragment(PlanNode node, int childIdx, - PlanFragment childFragment) throws InternalException { + PlanFragment childFragment) throws ImpalaException { ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId()); exchangeNode.addChild(childFragment.getPlanRoot(), false); exchangeNode.init(ctx_.getRootAnalyzer()); @@ -676,7 +675,7 @@ public class DistributedPlanner { */ private PlanFragment createParentFragment( PlanFragment childFragment, DataPartition parentPartition) - throws InternalException { + throws ImpalaException { ExchangeNode exchangeNode = new ExchangeNode(ctx_.getNextNodeId()); exchangeNode.addChild(childFragment.getPlanRoot(), false); exchangeNode.init(ctx_.getRootAnalyzer()); @@ -698,7 +697,7 @@ public class DistributedPlanner { */ private PlanFragment createAggregationFragment(AggregationNode node, PlanFragment childFragment, ArrayList fragments) - throws InternalException { + throws ImpalaException { if (!childFragment.isPartitioned()) { // nothing to distribute; do full aggregation directly within childFragment childFragment.addPlanRoot(node); @@ -848,7 +847,7 @@ public class DistributedPlanner { */ private PlanFragment createAnalyticFragment(PlanNode node, PlanFragment childFragment, ArrayList fragments) - throws InternalException { + throws ImpalaException { Preconditions.checkState( node instanceof SortNode || node instanceof AnalyticEvalNode); if (node instanceof AnalyticEvalNode) { @@ -894,7 +893,7 @@ public class DistributedPlanner { */ private PlanFragment createOrderByFragment(SortNode node, PlanFragment childFragment, ArrayList fragments) - throws InternalException { + throws ImpalaException { node.setChild(0, childFragment.getPlanRoot()); childFragment.addPlanRoot(node); if (!childFragment.isPartitioned()) return childFragment; diff --git a/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java index 95cb4bcc4..93633e700 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/HBaseScanNode.java @@ -41,6 +41,7 @@ import com.cloudera.impala.catalog.HBaseColumn; import com.cloudera.impala.catalog.HBaseTable; import com.cloudera.impala.catalog.PrimitiveType; import com.cloudera.impala.catalog.Type; +import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.common.InternalException; import com.cloudera.impala.common.Pair; import com.cloudera.impala.service.FeSupport; @@ -113,7 +114,8 @@ public class HBaseScanNode extends ScanNode { } @Override - public void init(Analyzer analyzer) throws InternalException { + public void init(Analyzer analyzer) throws ImpalaException { + checkForSupportedFileFormats(); assignConjuncts(analyzer); setStartStopKey(analyzer); // Convert predicates to HBase filters_. diff --git a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java index b3b87f564..26ad95660 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/HashJoinNode.java @@ -26,6 +26,7 @@ import com.cloudera.impala.analysis.ExprSubstitutionMap; import com.cloudera.impala.analysis.JoinOperator; import com.cloudera.impala.catalog.Type; import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.common.InternalException; import com.cloudera.impala.thrift.TEqJoinCondition; import com.cloudera.impala.thrift.TExplainLevel; @@ -63,7 +64,7 @@ public class HashJoinNode extends JoinNode { public void setAddProbeFilters(boolean b) { addProbeFilters_ = true; } @Override - public void init(Analyzer analyzer) throws InternalException { + public void init(Analyzer analyzer) throws ImpalaException { super.init(analyzer); List newEqJoinConjuncts = Lists.newArrayList(); ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap(); diff --git a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java index 57fc46557..eec7c1c5e 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/HdfsScanNode.java @@ -41,13 +41,16 @@ import com.cloudera.impala.analysis.SlotId; import com.cloudera.impala.analysis.SlotRef; import com.cloudera.impala.analysis.TupleDescriptor; import com.cloudera.impala.analysis.TupleId; +import com.cloudera.impala.catalog.Column; import com.cloudera.impala.catalog.HdfsFileFormat; import com.cloudera.impala.catalog.HdfsPartition; import com.cloudera.impala.catalog.HdfsPartition.FileBlock; import com.cloudera.impala.catalog.HdfsTable; import com.cloudera.impala.catalog.Type; import com.cloudera.impala.common.AnalysisException; +import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.common.InternalException; +import com.cloudera.impala.common.NotImplementedException; import com.cloudera.impala.common.PrintUtils; import com.cloudera.impala.common.RuntimeEnv; import com.cloudera.impala.thrift.TExplainLevel; @@ -63,6 +66,7 @@ import com.cloudera.impala.thrift.TScanRange; import com.cloudera.impala.thrift.TScanRangeLocation; import com.cloudera.impala.thrift.TScanRangeLocations; import com.cloudera.impala.util.MembershipSnapshot; +import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Objects.ToStringHelper; import com.google.common.base.Preconditions; @@ -137,7 +141,7 @@ public class HdfsScanNode extends ScanNode { * Populate conjuncts_, collectionConjuncts_, partitions_, and scanRanges_. */ @Override - public void init(Analyzer analyzer) throws InternalException { + public void init(Analyzer analyzer) throws ImpalaException { ArrayList bindingPredicates = analyzer.getBoundPredicates(tupleIds_.get(0)); conjuncts_.addAll(bindingPredicates); @@ -149,6 +153,7 @@ public class HdfsScanNode extends ScanNode { // do partition pruning before deciding which slots to materialize, // we might end up removing some predicates prunePartitions(analyzer); + checkForSupportedFileFormats(); // mark all slots referenced by the remaining conjuncts as materialized markSlotsMaterialized(analyzer, conjuncts_); @@ -167,6 +172,47 @@ public class HdfsScanNode extends ScanNode { assignedConjuncts_ = analyzer.getAssignedConjuncts(); } + /** + * Throws if the table schema contains a complex type and we need to scan + * a partition that has a format for which we do not support complex types, + * regardless of whether a complex-typed column is actually referenced + * in the query. + */ + @Override + protected void checkForSupportedFileFormats() throws NotImplementedException { + Preconditions.checkNotNull(desc_); + Preconditions.checkNotNull(desc_.getTable()); + Column firstComplexTypedCol = null; + for (Column col: desc_.getTable().getColumns()) { + if (col.getType().isComplexType()) { + firstComplexTypedCol = col; + break; + } + } + if (firstComplexTypedCol == null) return; + + for (HdfsPartition part: partitions_) { + HdfsFileFormat format = part.getInputFormatDescriptor().getFileFormat(); + if (format.isComplexTypesSupported()) continue; + String errSuffix = String.format( + "Complex types are supported for these file formats: %s", + Joiner.on(", ").join(HdfsFileFormat.complexTypesFormats())); + if (desc_.getTable().getNumClusteringCols() == 0) { + throw new NotImplementedException(String.format( + "Scan of table '%s' in format '%s' is not supported because the table " + + "has a column '%s' with a complex type '%s'.\n%s.", + desc_.getAlias(), format, firstComplexTypedCol.getName(), + firstComplexTypedCol.getType().toSql(), errSuffix)); + } + throw new NotImplementedException(String.format( + "Scan of partition '%s' in format '%s' of table '%s' is not supported " + + "because the table has a column '%s' with a complex type '%s'.\n%s.", + part.getPartitionName(), format, desc_.getAlias(), + firstComplexTypedCol.getName(), firstComplexTypedCol.getType().toSql(), + errSuffix)); + } + } + /** * Populates the collection conjuncts, materializes their required slots, and marks * the conjuncts as assigned, if it is correct to do so. Some conjuncts may have to diff --git a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java index d3a8eba69..4b49dea90 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/JoinNode.java @@ -27,7 +27,7 @@ import com.cloudera.impala.analysis.SlotDescriptor; import com.cloudera.impala.analysis.SlotRef; import com.cloudera.impala.catalog.ColumnStats; import com.cloudera.impala.catalog.Table; -import com.cloudera.impala.common.InternalException; +import com.cloudera.impala.common.ImpalaException; import com.google.common.base.Preconditions; /** @@ -130,7 +130,7 @@ public abstract class JoinNode extends PlanNode { public void setDistributionMode(DistributionMode distrMode) { distrMode_ = distrMode; } @Override - public void init(Analyzer analyzer) throws InternalException { + public void init(Analyzer analyzer) throws ImpalaException { super.init(analyzer); assignedConjuncts_ = analyzer.getAssignedConjuncts(); otherJoinConjuncts_ = Expr.substituteList(otherJoinConjuncts_, diff --git a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java index 1b2b8f987..2e86bae3c 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/NestedLoopJoinNode.java @@ -24,7 +24,7 @@ import com.cloudera.impala.analysis.Analyzer; import com.cloudera.impala.analysis.BinaryPredicate; import com.cloudera.impala.analysis.Expr; import com.cloudera.impala.analysis.JoinOperator; -import com.cloudera.impala.common.InternalException; +import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.thrift.TExplainLevel; import com.cloudera.impala.thrift.TNestedLoopJoinNode; import com.cloudera.impala.thrift.TPlanNode; @@ -56,7 +56,7 @@ public class NestedLoopJoinNode extends JoinNode { } @Override - public void init(Analyzer analyzer) throws InternalException { + public void init(Analyzer analyzer) throws ImpalaException { super.init(analyzer); Preconditions.checkState(eqJoinConjuncts_.isEmpty()); // Set the proper join operator based on whether predicates are assigned or not. diff --git a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java index f42576c6b..77e00ba1f 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/PlanNode.java @@ -28,7 +28,7 @@ import com.cloudera.impala.analysis.ExprSubstitutionMap; import com.cloudera.impala.analysis.SlotId; import com.cloudera.impala.analysis.TupleDescriptor; import com.cloudera.impala.analysis.TupleId; -import com.cloudera.impala.common.InternalException; +import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.common.PrintUtils; import com.cloudera.impala.common.TreeNode; import com.cloudera.impala.thrift.TExecStats; @@ -390,7 +390,7 @@ abstract public class PlanNode extends TreeNode { * state required for toThrift(). This is called directly after construction. * Throws if an expr substitution or evaluation fails. */ - public void init(Analyzer analyzer) throws InternalException { + public void init(Analyzer analyzer) throws ImpalaException { assignConjuncts(analyzer); computeStats(analyzer); createDefaultSmap(analyzer); diff --git a/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java b/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java index 6f3a577ce..b6d6b9a64 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/ScanNode.java @@ -18,6 +18,8 @@ import java.util.List; import com.cloudera.impala.analysis.SlotDescriptor; import com.cloudera.impala.analysis.TupleDescriptor; +import com.cloudera.impala.catalog.HdfsFileFormat; +import com.cloudera.impala.common.NotImplementedException; import com.cloudera.impala.thrift.TExplainLevel; import com.cloudera.impala.thrift.TNetworkAddress; import com.cloudera.impala.thrift.TScanRangeLocations; @@ -48,6 +50,31 @@ abstract public class ScanNode extends PlanNode { public TupleDescriptor getTupleDesc() { return desc_; } + /** + * Checks if this scan is supported based on the types of scanned columns and the + * underlying file formats, in particular, whether complex types are supported. + * + * The default implementation throws if this scan would need to materialize a nested + * field or collection. The scan is ok if the table schema contains complex types, as + * long as the query does not reference them. + * + * Subclasses should override this function as appropriate. + */ + protected void checkForSupportedFileFormats() throws NotImplementedException { + Preconditions.checkNotNull(desc_); + Preconditions.checkNotNull(desc_.getTable()); + for (SlotDescriptor slotDesc: desc_.getSlots()) { + if (slotDesc.getType().isComplexType() || slotDesc.getColumn() == null) { + Preconditions.checkNotNull(slotDesc.getPath()); + throw new NotImplementedException(String.format( + "Scan of table '%s' is not supported because '%s' references a nested " + + "field/collection.\nComplex types are supported for these file formats: %s.", + slotDesc.getPath().toString(), desc_.getAlias(), + Joiner.on(", ").join(HdfsFileFormat.complexTypesFormats()))); + } + } + } + /** * Returns all scan ranges plus their locations. */ diff --git a/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java b/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java index 21516c96b..0854af054 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java +++ b/fe/src/main/java/com/cloudera/impala/planner/SingleNodePlanner.java @@ -855,7 +855,7 @@ public class SingleNodePlanner { * Assigns conjuncts from the Having clause to the returned node. */ private PlanNode createAggregationPlan(SelectStmt selectStmt, Analyzer analyzer, - PlanNode root) throws InternalException { + PlanNode root) throws ImpalaException { Preconditions.checkState(selectStmt.getAggInfo() != null); // add aggregation, if required AggregateInfo aggInfo = selectStmt.getAggInfo(); diff --git a/fe/src/main/java/com/cloudera/impala/planner/SingularRowSrcNode.java b/fe/src/main/java/com/cloudera/impala/planner/SingularRowSrcNode.java index 1ed9665a1..93085410e 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/SingularRowSrcNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/SingularRowSrcNode.java @@ -15,7 +15,7 @@ package com.cloudera.impala.planner; import com.cloudera.impala.analysis.Analyzer; -import com.cloudera.impala.common.InternalException; +import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.thrift.TExplainLevel; import com.cloudera.impala.thrift.TPlanNode; import com.cloudera.impala.thrift.TPlanNodeType; @@ -39,7 +39,7 @@ public class SingularRowSrcNode extends PlanNode { } @Override - public void init(Analyzer analyzer) throws InternalException { + public void init(Analyzer analyzer) throws ImpalaException { super.init(analyzer); outputSmap_ = containingSubplanNode_.getChild(0).getOutputSmap(); Preconditions.checkState(conjuncts_.isEmpty()); diff --git a/fe/src/main/java/com/cloudera/impala/planner/UnnestNode.java b/fe/src/main/java/com/cloudera/impala/planner/UnnestNode.java index f74eb8e58..1b12cba9e 100644 --- a/fe/src/main/java/com/cloudera/impala/planner/UnnestNode.java +++ b/fe/src/main/java/com/cloudera/impala/planner/UnnestNode.java @@ -17,7 +17,7 @@ package com.cloudera.impala.planner; import com.cloudera.impala.analysis.Analyzer; import com.cloudera.impala.analysis.CollectionTableRef; import com.cloudera.impala.analysis.Expr; -import com.cloudera.impala.common.InternalException; +import com.cloudera.impala.common.ImpalaException; import com.cloudera.impala.thrift.TExplainLevel; import com.cloudera.impala.thrift.TPlanNode; import com.cloudera.impala.thrift.TPlanNodeType; @@ -47,7 +47,7 @@ public class UnnestNode extends PlanNode { } @Override - public void init(Analyzer analyzer) throws InternalException { + public void init(Analyzer analyzer) throws ImpalaException { // Do not assign binding predicates or predicates for enforcing slot equivalences // because they must have been assigned in the scan node materializing the // collection-typed slot. diff --git a/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java b/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java index bb2a426ed..4b018c797 100644 --- a/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java +++ b/fe/src/test/java/com/cloudera/impala/planner/PlannerTest.java @@ -71,6 +71,11 @@ public class PlannerTest extends PlannerTestBase { runPlannerTestFile("nested-collections"); } + @Test + public void testComplexTypesFileFormats() { + runPlannerTestFile("complex-types-file-formats"); + } + @Test public void testJoins() { runPlannerTestFile("joins"); diff --git a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java index df1d2041e..746b908d0 100644 --- a/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java +++ b/fe/src/test/java/com/cloudera/impala/planner/PlannerTestBase.java @@ -306,7 +306,7 @@ public class PlannerTestBase { } else { // Compare actual and expected error messages. if (expectedErrorMsg != null && !expectedErrorMsg.isEmpty()) { - if (!e.getMessage().toLowerCase().equals(expectedErrorMsg.toLowerCase())) { + if (!e.getMessage().toLowerCase().startsWith(expectedErrorMsg.toLowerCase())) { errorLog.append("query:\n" + query + "\nExpected error message: '" + expectedErrorMsg + "'\nActual error message: '" + e.getMessage() + "'\n"); diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py index 6a74d069f..b937b3f0b 100755 --- a/testdata/bin/generate-schema-statements.py +++ b/testdata/bin/generate-schema-statements.py @@ -495,20 +495,6 @@ def generate_statements(output_name, test_vectors, sections, # sections), which is used to generate the create table statement. if create_hive: table_template = create_hive - # Loading dependent Avro tables involves generating an Avro schema literal from - # the COLUMNS section, but the COLUMNS section is not provided for CREATE_HIVE. - # The custom CREATE TABLE leaves the columns opaque to us, so we cannot generate - # an Avro schema literal. - # However, if the schema constraints are set up such that we are only going to - # to load this single Avro table, then we can safely proceed assuming that the - # provided CREATE TABLE has all necessary information to create an Avro table. - # TODO: Remove this restriction once Impala has the ability to infer the Avro - # schema from column definitions. Then we do not need to generate an Avro - # schema literal for creating dependent Avro tables anymore. - load_single_table = len(schema_include_constraints[table_name.lower()]) == 1 - if file_format == 'avro' and not load_single_table: - print 'CREATE_HIVE section not supported' - continue elif create: table_template = create if file_format in ['avro', 'hbase']: diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index a93f07780..e945f438a 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -555,6 +555,55 @@ delimited fields terminated by ',' escaped by '\\' ---- DATASET functional ---- BASE_TABLE_NAME +complextypes_fileformat +---- CREATE_HIVE +-- Used for positive/negative testing of complex types on various file formats. +-- In particular, queries on file formats for which we do not support complex types +-- should fail gracefully. +CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( + id int, + s struct, + a array, + m map) +STORED AS {file_format}; +---- ALTER +-- This INSERT is placed in the ALTER section and not in the DEPENDENT_LOAD section because +-- it must always be executed in Hive. The DEPENDENT_LOAD section is sometimes executed in +-- Impala, but Impala currently does not support inserting into tables with complex types. +INSERT OVERWRITE TABLE {table_name} SELECT * FROM functional.{table_name}; +---- LOAD +INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT id, named_struct("f1",string_col,"f2",int_col), array(1, 2, 3), map("k", cast(0 as bigint)) FROM functional.alltypestiny; +==== +---- DATASET +functional +---- BASE_TABLE_NAME +complextypes_multifileformat +---- CREATE_HIVE +-- Used for positive/negative testing of complex types on various file formats. +-- In particular, queries on file formats for which we do not support complex types +-- should fail gracefully. This table allows testing at a partition granularity. +CREATE TABLE IF NOT EXISTS {db_name}{db_suffix}.{table_name} ( + id int, + s struct, + a array, + m map) +PARTITIONED BY (p int) +STORED AS {file_format}; +---- LOAD +INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} PARTITION(p=1) SELECT id, named_struct("f1",string_col,"f2",int_col), array(1, 2, 3), map("k", cast(0 as bigint)) FROM functional.alltypestiny; +INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} PARTITION(p=2) SELECT id, named_struct("f1",string_col,"f2",int_col), array(1, 2, 3), map("k", cast(0 as bigint)) FROM functional.alltypestiny; +INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} PARTITION(p=3) SELECT id, named_struct("f1",string_col,"f2",int_col), array(1, 2, 3), map("k", cast(0 as bigint)) FROM functional.alltypestiny; +-- The order of insertions and alterations is deliberately chose to work around a Hive +-- bug where the format of an altered partition is reverted back to the original format after +-- an insert. So we first do the insert, and then alter the format. +USE {db_name}{db_suffix}; +ALTER TABLE {table_name} PARTITION (p=2) SET FILEFORMAT PARQUET; +ALTER TABLE {table_name} PARTITION (p=3) SET FILEFORMAT AVRO; +USE default; +==== +---- DATASET +functional +---- BASE_TABLE_NAME testtbl ---- COLUMNS id bigint diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv index 4fea58fbf..129f7a2e6 100644 --- a/testdata/datasets/functional/schema_constraints.csv +++ b/testdata/datasets/functional/schema_constraints.csv @@ -50,6 +50,13 @@ table_name:allcomplextypes, constraint:restrict_to, table_format:parquet/none/no table_name:allcomplextypes, constraint:restrict_to, table_format:hbase/none/none table_name:functional, constraint:restrict_to, table_format:text/none/none +table_name:complextypes_fileformat, constraint:restrict_to, table_format:text/none/none +table_name:complextypes_fileformat, constraint:restrict_to, table_format:parquet/none/none +table_name:complextypes_fileformat, constraint:restrict_to, table_format:avro/snap/block +table_name:complextypes_fileformat, constraint:restrict_to, table_format:rc/snap/block +table_name:complextypes_fileformat, constraint:restrict_to, table_format:seq/snap/block +table_name:complextypes_multifileformat, constraint:restrict_to, table_format:text/none/none + table_name:alltypeserror, constraint:exclude, table_format:parquet/none/none table_name:alltypeserrornonulls, constraint:exclude, table_format:parquet/none/none table_name:unsupported_types, constraint:exclude, table_format:parquet/none/none diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/complex-types-file-formats.test b/testdata/workloads/functional-planner/queries/PlannerTest/complex-types-file-formats.test new file mode 100644 index 000000000..a3e8ecf4e --- /dev/null +++ b/testdata/workloads/functional-planner/queries/PlannerTest/complex-types-file-formats.test @@ -0,0 +1,131 @@ +# Scanning an unpartitioned Parquet table with complex types plans ok. +select s.f1 from functional_parquet.complextypes_fileformat t, t.a +---- PLAN +01:SUBPLAN +| +|--04:NESTED LOOP JOIN [CROSS JOIN] +| | +| |--02:SINGULAR ROW SRC +| | +| 03:UNNEST [t.a] +| +00:SCAN HDFS [functional_parquet.complextypes_fileformat t] + partitions=1/1 files=1 size=204B +==== +# Scanning an unpartitioned Avro table with complex types plans ok. +select s.f1 from functional_avro_snap.complextypes_fileformat t, t.a +---- PLAN +01:SUBPLAN +| +|--04:NESTED LOOP JOIN [CROSS JOIN] +| | +| |--02:SINGULAR ROW SRC +| | +| 03:UNNEST [t.a] +| +00:SCAN HDFS [functional_avro_snap.complextypes_fileformat t] + partitions=1/1 files=1 size=649B +==== +# Complex types are not supported on text files. +select s.f1 from functional.complextypes_fileformat t, t.a +---- PLAN +not implemented: Scan of table 't' in format 'TEXT' is not supported because the table has a column 's' with a complex type 'STRUCT'. +Complex types are supported for these file formats: AVRO, PARQUET. +==== +# Complex types are not supported on text files, even if no complex-typed +# columns are selected. +select 1 from functional.complextypes_fileformat +---- PLAN +not implemented: Scan of table 'functional.complextypes_fileformat' in format 'TEXT' is not supported because the table has a column 's' with a complex type 'STRUCT'. +Complex types are supported for these file formats: AVRO, PARQUET. +==== +# Complex types are not supported on RC files. +select 1 from functional_rc_snap.complextypes_fileformat t, t.a +---- PLAN +not implemented: Scan of table 't' in format 'RC_FILE' is not supported because the table has a column 's' with a complex type 'STRUCT'. +Complex types are supported for these file formats: AVRO, PARQUET. +==== +# Complex types are not supported on RC files, even if no complex-typed +# columns are selected. +select 1 from functional_rc_snap.complextypes_fileformat +---- PLAN +not implemented: Scan of table 'functional_rc_snap.complextypes_fileformat' in format 'RC_FILE' is not supported because the table has a column 's' with a complex type 'STRUCT'. +Complex types are supported for these file formats: AVRO, PARQUET. +==== +# Complex types are not supported on sequence files. +select s.f1 from functional_seq_snap.complextypes_fileformat t, t.a +---- PLAN +not implemented: Scan of table 't' in format 'SEQUENCE_FILE' is not supported because the table has a column 's' with a complex type 'STRUCT'. +Complex types are supported for these file formats: AVRO, PARQUET. +==== +# Complex types are not supported on sequence files, even if no complex-typed +# columns are selected. +select 1 from functional_seq_snap.complextypes_fileformat +---- PLAN +not implemented: Scan of table 'functional_seq_snap.complextypes_fileformat' in format 'SEQUENCE_FILE' is not supported because the table has a column 's' with a complex type 'STRUCT'. +Complex types are supported for these file formats: AVRO, PARQUET. +==== +# Scanning all partitions fails because one of them is text. +select s.f1 from functional.complextypes_multifileformat t, t.a +---- PLAN +not implemented: Scan of partition 'p=1' in format 'TEXT' of table 't' is not supported because the table has a column 's' with a complex type 'STRUCT'. +Complex types are supported for these file formats: AVRO, PARQUET. +==== +# Scanning an HBase table with complex-types columns is ok as long as no complex-typed +# columns are selected. +select id from functional_hbase.allcomplextypes +---- PLAN +00:SCAN HBASE [functional_hbase.allcomplextypes] +==== +# Scanning an HBase table with complex-types columns fails if a complex-typed +# column is selected. +select id from functional_hbase.allcomplextypes t, t.int_array_col +---- PLAN +not implemented: Scan of table 't.int_array_col' is not supported because 't' references a nested field/collection. +Complex types are supported for these file formats: AVRO, PARQUET. +==== +# Scanning an HBase table with complex-types columns fails if a complex-typed +# column is selected. +select complex_struct_col.f1 from functional_hbase.allcomplextypes +---- PLAN +not implemented: Scan of table 'functional_hbase.allcomplextypes.complex_struct_col.f1' is not supported because 'functional_hbase.allcomplextypes' references a nested field/collection. +Complex types are supported for these file formats: AVRO, PARQUET. +==== +# The complextypes_multifileformat has three partitions with different file formats: +# p=1 text +# p=2 parquet +# p=3 avro +# Scanning a text partition of a multi-format table with complex types fails. +select 1 from functional.complextypes_multifileformat where p = 1 +---- PLAN +not implemented: Scan of partition 'p=1' in format 'TEXT' of table 'functional.complextypes_multifileformat' is not supported because the table has a column 's' with a complex type 'STRUCT'. +Complex types are supported for these file formats: AVRO, PARQUET. +==== +# Scanning a Parquet partition of a multi-format table with complex types plans ok. +select s.f1 from functional.complextypes_multifileformat t, t.a where p = 2 +---- PLAN +01:SUBPLAN +| +|--04:NESTED LOOP JOIN [CROSS JOIN] +| | +| |--02:SINGULAR ROW SRC +| | +| 03:UNNEST [t.a] +| +00:SCAN HDFS [functional.complextypes_multifileformat t] + partitions=1/3 files=1 size=128B +==== +# Scanning an Avro partition of a multi-format table with complex types plans ok. +select s.f1 from functional.complextypes_multifileformat t, t.a where p = 3 +---- PLAN +01:SUBPLAN +| +|--04:NESTED LOOP JOIN [CROSS JOIN] +| | +| |--02:SINGULAR ROW SRC +| | +| 03:UNNEST [t.a] +| +00:SCAN HDFS [functional.complextypes_multifileformat t] + partitions=1/3 files=1 size=128B +====