diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index b090aee9d..9f0c6d48c 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -349,20 +349,17 @@ struct TDistributeByHashParam { 2: required i32 num_buckets } -struct TRangeLiteral { - 1: optional i64 int_literal - 2: optional string string_literal +struct TRangePartition { + 1: optional list lower_bound_values + 2: optional bool is_lower_bound_inclusive + 3: optional list upper_bound_values + 4: optional bool is_upper_bound_inclusive } -struct TRangeLiteralList { - // TODO: Replace TRangeLiteral with Exprs.TExpr. - 1: required list values -} - -// A range distribution is identified by a list of columns and a series of split rows. +// A range distribution is identified by a list of columns and a list of range partitions. struct TDistributeByRangeParam { 1: required list columns - 2: optional list split_rows; + 2: optional list range_partitions } // Parameters for the DISTRIBUTE BY clause. diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index af4f7a689..836bc2da2 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java_cup.runtime.Symbol; @@ -28,6 +29,7 @@ import java_cup.runtime.Symbol; import org.apache.impala.analysis.ColumnDef; import org.apache.impala.analysis.UnionStmt.Qualifier; import org.apache.impala.analysis.UnionStmt.UnionOperand; +import org.apache.impala.analysis.RangePartition; import org.apache.impala.catalog.ArrayType; import org.apache.impala.catalog.MapType; import org.apache.impala.catalog.RowFormat; @@ -319,7 +321,7 @@ nonterminal SelectListItem star_expr; nonterminal Expr expr, non_pred_expr, arithmetic_expr, timestamp_arithmetic_expr; nonterminal ArrayList expr_list; nonterminal String alias_clause; -nonterminal ArrayList ident_list, primary_keys, opt_primary_keys; +nonterminal ArrayList ident_list, primary_keys; nonterminal ArrayList opt_ident_list; nonterminal TableName table_name; nonterminal FunctionName function_name; @@ -395,11 +397,13 @@ nonterminal ShowDataSrcsStmt show_data_srcs_stmt; nonterminal StructField struct_field_def; nonterminal String ident_or_keyword; nonterminal DistributeParam distribute_hash_param; +nonterminal List range_params_list; +nonterminal RangePartition range_param; +nonterminal Pair opt_lower_range_val, + opt_upper_range_val; nonterminal ArrayList distribute_hash_param_list; nonterminal ArrayList distribute_param_list; nonterminal DistributeParam distribute_range_param; -nonterminal List> split_row_list; -nonterminal List literal_list; nonterminal ColumnDef column_def, view_column_def; nonterminal ArrayList column_def_list, partition_column_defs, view_column_def_list, view_column_defs; @@ -489,6 +493,7 @@ precedence left BITAND, BITOR, BITXOR, BITNOT; precedence left FACTORIAL; precedence left KW_ORDER, KW_BY, KW_LIMIT; precedence left LPAREN, RPAREN; +precedence left KW_VALUES; // Support chaining of timestamp arithmetic exprs. precedence left KW_INTERVAL; precedence left KW_TBLPROPERTIES; @@ -1141,33 +1146,65 @@ distribute_hash_param ::= // The column list for a RANGE clause is optional. distribute_range_param ::= - KW_RANGE LPAREN ident_list:cols RPAREN KW_SPLIT KW_ROWS - LPAREN split_row_list:list RPAREN - {: RESULT = DistributeParam.createRangeParam(cols, list); :} - | KW_RANGE KW_SPLIT KW_ROWS LPAREN split_row_list:list RPAREN - {: RESULT = DistributeParam.createRangeParam(Lists.newArrayList(), list); :} + KW_RANGE LPAREN ident_list:cols RPAREN LPAREN range_params_list:ranges RPAREN + {: + RESULT = DistributeParam.createRangeParam(cols, ranges); + :} + | KW_RANGE LPAREN range_params_list:ranges RPAREN + {: + RESULT = DistributeParam.createRangeParam(Collections.emptyList(), ranges); + :} ; -split_row_list ::= - LPAREN literal_list:l RPAREN - {: RESULT = Lists.>newArrayList(l); :} - | split_row_list:list COMMA LPAREN literal_list:l RPAREN +range_params_list ::= + range_param:param {: - list.add(l); + RESULT = Lists.newArrayList(param); + :} + | range_params_list:list COMMA range_param:param + {: + list.add(param); RESULT = list; :} ; -literal_list ::= - literal:l - {: RESULT = Lists.newArrayList(l); :} - | literal_list:list COMMA literal:l +range_param ::= + KW_PARTITION opt_lower_range_val:lower_val KW_VALUES opt_upper_range_val:upper_val + {: RESULT = RangePartition.createFromRange(lower_val, upper_val); :} + // Use dotted_path to avoid reduce/reduce conflicts with expr + | KW_PARTITION dotted_path:val EQUAL expr:l {: - list.add(l); - RESULT = list; + if (!val.get(0).toUpperCase().equals("VALUE")) { + parser.parseError("identifier", SqlParserSymbols.IDENT, "VALUE"); + } + RESULT = RangePartition.createFromValues(Lists.newArrayList(l)); :} + | KW_PARTITION dotted_path:val EQUAL LPAREN expr_list:l RPAREN + {: + if (!val.get(0).toUpperCase().equals("VALUE")) { + parser.parseError("identifier", SqlParserSymbols.IDENT, "VALUE"); + } + RESULT = RangePartition.createFromValues(l); :} ; +opt_lower_range_val ::= + expr:l LESSTHAN + {: RESULT = new Pair(l, false); :} + | expr:l LESSTHAN EQUAL + {: RESULT = new Pair(l, true); :} + | /* empty */ + {: RESULT = null; :} + ; + +opt_upper_range_val ::= + LESSTHAN expr:l + {: RESULT = new Pair(l, false); :} + | LESSTHAN EQUAL expr:l + {: RESULT = new Pair(l, true); :} + | /* empty */ + {: RESULT = null; :} + ; + create_udf_stmt ::= KW_CREATE KW_FUNCTION if_not_exists_val:if_not_exists function_name:fn_name function_def_args:fn_args diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java index 1b634f763..923a0a62e 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java +++ b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java @@ -19,12 +19,12 @@ package org.apache.impala.analysis; import java.util.Collection; import java.util.List; +import java.util.LinkedHashMap; import java.util.Map; import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -182,10 +182,11 @@ public class ColumnDef { /** * Generates and returns a map of column names to column definitions. Assumes that - * the column names are unique. + * the column names are unique. It guarantees that the iteration order of the map + * is the same as the iteration order of 'colDefs'. */ static Map mapByColumnNames(Collection colDefs) { - Map colDefsByColName = Maps.newHashMap(); + Map colDefsByColName = new LinkedHashMap(); for (ColumnDef colDef: colDefs) { ColumnDef def = colDefsByColName.put(colDef.getColName(), colDef); Preconditions.checkState(def == null); diff --git a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java index 34bed860e..13fa6e603 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java +++ b/fe/src/main/java/org/apache/impala/analysis/DistributeParam.java @@ -25,75 +25,91 @@ import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TDistributeByHashParam; import org.apache.impala.thrift.TDistributeByRangeParam; import org.apache.impala.thrift.TDistributeParam; -import org.apache.impala.thrift.TRangeLiteral; -import org.apache.impala.thrift.TRangeLiteralList; -import org.apache.impala.util.KuduUtil; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.Lists; /** - * Represents the information of + * Represents the distribution of a Kudu table as defined in the DISTRIBUTE BY + * clause of a CREATE TABLE statement. The distribution can be hash-based or + * range-based or both. See RangePartition for details on the supported range partitions. * - * DISTRIBUTE BY HASH[(col_def_list)] INTO n BUCKETS - * DISTRIBUTE BY RANGE[(col_def_list)] SPLIT ROWS ( (v1,v2,v3), ...) + * Examples: + * - Hash-based: + * DISTRIBUTE BY HASH(id) INTO 10 BUCKETS + * - Single column range-based: + * DISTRIBUTE BY RANGE(age) + * ( + * PARTITION VALUES < 10, + * PARTITION 10 <= VALUES < 20, + * PARTITION 20 <= VALUES < 30, + * PARTITION VALUE = 100 + * ) + * - Combination of hash and range based: + * DISTRIBUTE BY HASH (id) INTO 3 BUCKETS, + * RANGE (age) + * ( + * PARTITION 10 <= VALUES < 20, + * PARTITION VALUE = 100 + * ) + * - Multi-column range based: + * DISTRIBUTE BY RANGE (year, quarter) + * ( + * PARTITION VALUE = (2001, 1), + * PARTITION VALUE = (2001, 2), + * PARTITION VALUE = (2002, 1) + * ) * - * clauses in CREATE TABLE statements, where available, e.g. Kudu. - * - * A table can be hash or range partitioned, or combinations of both. A distribute - * clause represents one particular distribution rule. For both HASH and RANGE types, - * some of the error checking is done during the analysis, but most of it is deferred - * until the table is actually created. - */ + */ public class DistributeParam implements ParseNode { /** - * Creates a DistributeParam partitioned by hash. + * Creates a hash-based DistributeParam. */ public static DistributeParam createHashParam(List cols, int buckets) { return new DistributeParam(Type.HASH, cols, buckets, null); } /** - * Creates a DistributeParam partitioned by range. + * Creates a range-based DistributeParam. */ public static DistributeParam createRangeParam(List cols, - List> splitRows) { - return new DistributeParam(Type.RANGE, cols, NO_BUCKETS, splitRows); + List rangePartitions) { + return new DistributeParam(Type.RANGE, cols, NO_BUCKETS, rangePartitions); } private static final int NO_BUCKETS = -1; /** - * The type of the distribution rule. + * The distribution type. */ public enum Type { HASH, RANGE } - // May be empty indicating that all keys in the table should be used. + // Columns of this distribution. If no columns are specified, all + // the primary key columns of the associated table are used. private final List colNames_ = Lists.newArrayList(); // Map of primary key column names to the associated column definitions. Must be set // before the call to analyze(). private Map pkColumnDefByName_; - // Distribution type + // Distribution scheme type private final Type type_; - // Only relevant for hash partitioning, -1 otherwise + // Only relevant for hash-based distribution, -1 otherwise private final int numBuckets_; - // Only relevant for range partitioning, null otherwise - private final List> splitRows_; + // List of range partitions specified in a range-based distribution. + private List rangePartitions_; private DistributeParam(Type t, List colNames, int buckets, - List> splitRows) { + List partitions) { type_ = t; for (String name: colNames) colNames_.add(name.toLowerCase()); + rangePartitions_ = partitions; numBuckets_ = buckets; - splitRows_ = splitRows; } @Override @@ -101,40 +117,26 @@ public class DistributeParam implements ParseNode { Preconditions.checkState(!colNames_.isEmpty()); Preconditions.checkNotNull(pkColumnDefByName_); Preconditions.checkState(!pkColumnDefByName_.isEmpty()); - // Validate the columns specified in the DISTRIBUTE BY clause + // Validate that the columns specified in this distribution are primary key columns. for (String colName: colNames_) { if (!pkColumnDefByName_.containsKey(colName)) { throw new AnalysisException(String.format("Column '%s' in '%s' is not a key " + "column. Only key columns can be used in DISTRIBUTE BY.", colName, toSql())); } } + if (type_ == Type.RANGE) analyzeRangeParam(analyzer); + } - if (type_ == Type.RANGE) { - for (List splitRow : splitRows_) { - if (splitRow.size() != colNames_.size()) { - throw new AnalysisException(String.format( - "SPLIT ROWS has different size than number of projected key columns: %d. " - + "Split row: %s", colNames_.size(), splitRowToString(splitRow))); - } - for (int i = 0; i < splitRow.size(); ++i) { - LiteralExpr expr = splitRow.get(i); - ColumnDef colDef = pkColumnDefByName_.get(colNames_.get(i)); - org.apache.impala.catalog.Type colType = colDef.getType(); - Preconditions.checkState(KuduUtil.isSupportedKeyType(colType)); - expr.analyze(analyzer); - org.apache.impala.catalog.Type exprType = expr.getType(); - if (exprType.isNull()) { - throw new AnalysisException("Split values cannot be NULL. Split row: " + - splitRowToString(splitRow)); - } - if (!org.apache.impala.catalog.Type.isImplicitlyCastable(exprType, colType, - true)) { - throw new AnalysisException(String.format("Split value %s (type: %s) is " + - "not type compatible with column '%s' (type: %s).", expr.toSql(), - exprType, colDef.getColName(), colType.toSql())); - } - } - } + /** + * Analyzes a range-based distribution. This function does not check for overlapping + * range partitions; these checks are performed by Kudu and an error is reported back + * to the user. + */ + public void analyzeRangeParam(Analyzer analyzer) throws AnalysisException { + List pkColDefs = Lists.newArrayListWithCapacity(colNames_.size()); + for (String colName: colNames_) pkColDefs.add(pkColumnDefByName_.get(colName)); + for (RangePartition rangePartition: rangePartitions_) { + rangePartition.analyze(analyzer, pkColDefs); } } @@ -150,13 +152,15 @@ public class DistributeParam implements ParseNode { Preconditions.checkState(numBuckets_ != NO_BUCKETS); builder.append(numBuckets_).append(" BUCKETS"); } else { - builder.append(" SPLIT ROWS ("); - if (splitRows_ == null) { - builder.append("..."); - } else { - for (List splitRow: splitRows_) { - builder.append(splitRowToString(splitRow)); + builder.append(" ("); + if (rangePartitions_ != null) { + List partsSql = Lists.newArrayList(); + for (RangePartition rangePartition: rangePartitions_) { + partsSql.add(rangePartition.toSql()); } + builder.append(Joiner.on(", ").join(partsSql)); + } else { + builder.append("..."); } builder.append(")"); } @@ -166,15 +170,6 @@ public class DistributeParam implements ParseNode { @Override public String toString() { return toSql(); } - private String splitRowToString(List splitRow) { - StringBuilder builder = new StringBuilder("("); - for (LiteralExpr expr: splitRow) { - if (builder.length() > 1) builder.append(", "); - builder.append(expr.toSql()); - } - return builder.append(")").toString(); - } - public TDistributeParam toThrift() { TDistributeParam result = new TDistributeParam(); // TODO: Add a validate() function to ensure the validity of distribute params. @@ -188,25 +183,12 @@ public class DistributeParam implements ParseNode { Preconditions.checkState(type_ == Type.RANGE); TDistributeByRangeParam rangeParam = new TDistributeByRangeParam(); rangeParam.setColumns(colNames_); - if (splitRows_ == null) { + if (rangePartitions_ == null) { result.setBy_range_param(rangeParam); return result; } - for (List splitRow : splitRows_) { - TRangeLiteralList list = new TRangeLiteralList(); - for (int i = 0; i < splitRow.size(); ++i) { - LiteralExpr expr = splitRow.get(i); - TRangeLiteral literal = new TRangeLiteral(); - if (expr instanceof NumericLiteral) { - literal.setInt_literal(((NumericLiteral)expr).getIntValue()); - } else { - String exprValue = expr.getStringValue(); - Preconditions.checkState(!Strings.isNullOrEmpty(exprValue)); - literal.setString_literal(exprValue); - } - list.addToValues(literal); - } - rangeParam.addToSplit_rows(list); + for (RangePartition rangePartition: rangePartitions_) { + rangeParam.addToRange_partitions(rangePartition.toThrift()); } result.setBy_range_param(rangeParam); } diff --git a/fe/src/main/java/org/apache/impala/analysis/RangePartition.java b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java new file mode 100644 index 000000000..3e41bc460 --- /dev/null +++ b/fe/src/main/java/org/apache/impala/analysis/RangePartition.java @@ -0,0 +1,227 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.analysis; + +import java.util.List; +import org.apache.impala.common.Pair; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +import org.apache.impala.common.AnalysisException; +import org.apache.impala.util.KuduUtil; +import org.apache.impala.thrift.TRangePartition; + +/** + * Represents a range partition of a Kudu table. + * + * The following cases are supported: + * - Bounded on both ends: + * PARTITION l_val <[=] VALUES <[=] u_val + * - Unbounded lower: + * PARTITION VALUES <[=] u_val + * - Unbounded upper: + * PARTITION l_val <[=] VALUES + * - Single value (no range): + * PARTITION VALUE = val + * - Multi-value: + * PARTITION VALUE = (val1, val2, ..., valn) + * + * Internally, all these cases are represented using the quadruplet: + * [(l_val1,..., l_valn), l_bound_type, (u_val1,..., u_valn), u_bound_type], + * where l_bound_type, u_bound_type are boolean values indicating if the associated bounds + * are inclusive (true) or exclusive (false). + */ +public class RangePartition implements ParseNode { + private final List lowerBound_; + private final boolean lowerBoundInclusive_; + private final List upperBound_; + private final boolean upperBoundInclusive_; + private final boolean isSingletonRange_; + + private RangePartition(List lowerBoundValues, boolean lowerBoundInclusive, + List upperBoundValues, boolean upperBoundInclusive) { + Preconditions.checkNotNull(lowerBoundValues); + Preconditions.checkNotNull(upperBoundValues); + Preconditions.checkState(!lowerBoundValues.isEmpty() || !upperBoundValues.isEmpty()); + lowerBound_ = lowerBoundValues; + lowerBoundInclusive_ = lowerBoundInclusive; + upperBound_ = upperBoundValues; + upperBoundInclusive_ = upperBoundInclusive; + isSingletonRange_ = (upperBoundInclusive && lowerBoundInclusive + && lowerBoundValues == upperBoundValues); + } + + /** + * Constructs a range partition. The range is specified in the CREATE TABLE statement + * using the 'PARTITION OP VALUES OP ' clause. 'lower' corresponds to + * the ' OP' pair which defines an optional lower bound. 'upper' corresponds to + * the 'OP ' pair which defines an optional upper bound. Since only '<' and + * '<=' operators are allowed, operators are represented with boolean values that + * indicate inclusive or exclusive bounds. + */ + public static RangePartition createFromRange(Pair lower, + Pair upper) { + List lowerBoundExprs = Lists.newArrayListWithCapacity(1); + boolean lowerBoundInclusive = false; + List upperBoundExprs = Lists.newArrayListWithCapacity(1); + boolean upperBoundInclusive = false; + if (lower != null) { + lowerBoundExprs.add(lower.first); + lowerBoundInclusive = lower.second; + } + if (upper != null) { + upperBoundExprs.add(upper.first); + upperBoundInclusive = upper.second; + } + return new RangePartition(lowerBoundExprs, lowerBoundInclusive, upperBoundExprs, + upperBoundInclusive); + } + + /** + * Constructs a range partition from a set of values. The values are specified in the + * CREATE TABLE statement using the 'PARTITION VALUE = ' or the + * 'PARTITION VALUE = (,...,)' clause. For both cases, the generated + * range partition has the same lower and upper bounds. + */ + public static RangePartition createFromValues(List values) { + return new RangePartition(values, true, values, true); + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { + throw new IllegalStateException("Not implemented"); + } + + public void analyze(Analyzer analyzer, List distributionColDefs) + throws AnalysisException { + analyzeBoundaryValues(lowerBound_, distributionColDefs, analyzer); + if (!isSingletonRange_) { + analyzeBoundaryValues(upperBound_, distributionColDefs, analyzer); + } + } + + private void analyzeBoundaryValues(List boundaryValues, + List distributionColDefs, Analyzer analyzer) throws AnalysisException { + if (!boundaryValues.isEmpty() + && boundaryValues.size() != distributionColDefs.size()) { + throw new AnalysisException(String.format("Number of specified range " + + "partition values is different than the number of distribution " + + "columns: (%d vs %d). Range partition: '%s'", boundaryValues.size(), + distributionColDefs.size(), toSql())); + } + for (int i = 0; i < boundaryValues.size(); ++i) { + LiteralExpr literal = analyzeBoundaryValue(boundaryValues.get(i), + distributionColDefs.get(i), analyzer); + Preconditions.checkNotNull(literal); + boundaryValues.set(i, literal); + } + } + + private LiteralExpr analyzeBoundaryValue(Expr value, ColumnDef pkColumn, + Analyzer analyzer) throws AnalysisException { + try { + value.analyze(analyzer); + } catch (AnalysisException e) { + throw new AnalysisException(String.format("Only constant values are allowed " + + "for range-partition bounds: %s", value.toSql()), e); + } + if (!value.isConstant()) { + throw new AnalysisException(String.format("Only constant values are allowed " + + "for range-partition bounds: %s", value.toSql())); + } + LiteralExpr literal = LiteralExpr.create(value, analyzer.getQueryCtx()); + if (literal == null) { + throw new AnalysisException(String.format("Only constant values are allowed " + + "for range-partition bounds: %s", value.toSql())); + } + if (literal.getType().isNull()) { + throw new AnalysisException(String.format("Range partition values cannot be " + + "NULL. Range partition: '%s'", toSql())); + } + org.apache.impala.catalog.Type colType = pkColumn.getType(); + Preconditions.checkState(KuduUtil.isSupportedKeyType(colType)); + org.apache.impala.catalog.Type literalType = literal.getType(); + if (!org.apache.impala.catalog.Type.isImplicitlyCastable(literalType, colType, + true)) { + throw new AnalysisException(String.format("Range partition value %s " + + "(type: %s) is not type compatible with distribution column '%s' (type: %s).", + literal.toSql(), literalType, pkColumn.getColName(), colType.toSql())); + } + if (!literalType.equals(colType)) { + Expr castLiteral = literal.uncheckedCastTo(colType); + Preconditions.checkNotNull(castLiteral); + literal = LiteralExpr.create(castLiteral, analyzer.getQueryCtx()); + } + Preconditions.checkNotNull(literal); + return literal; + } + + @Override + public String toSql() { + StringBuilder output = new StringBuilder(); + output.append("PARTITION "); + if (isSingletonRange_) { + output.append("VALUE = "); + if (lowerBound_.size() > 1) output.append("("); + List literals = Lists.newArrayList(); + for (Expr literal: lowerBound_) literals.add(literal.toSql()); + output.append(Joiner.on(",").join(literals)); + if (lowerBound_.size() > 1) output.append(")"); + } else { + if (!lowerBound_.isEmpty()) { + Preconditions.checkState(lowerBound_.size() == 1); + output.append(lowerBound_.get(0).toSql() + " " + + (lowerBoundInclusive_ ? "<=" : "<")); + output.append(" "); + } + output.append("VALUES"); + if (!upperBound_.isEmpty()) { + Preconditions.checkState(upperBound_.size() == 1); + output.append(" "); + output.append((upperBoundInclusive_ ? "<=" : "<") + " " + + upperBound_.get(0).toSql() ); + } + } + return output.toString(); + } + + public TRangePartition toThrift() { + TRangePartition tRangePartition = new TRangePartition(); + for (Expr literal: lowerBound_) { + tRangePartition.addToLower_bound_values(literal.treeToThrift()); + } + if (!lowerBound_.isEmpty()) { + tRangePartition.setIs_lower_bound_inclusive(lowerBoundInclusive_); + } + for (Expr literal: upperBound_) { + tRangePartition.addToUpper_bound_values(literal.treeToThrift()); + } + if (!upperBound_.isEmpty()) { + tRangePartition.setIs_upper_bound_inclusive(upperBoundInclusive_); + } + Preconditions.checkState(tRangePartition.isSetLower_bound_values() + || tRangePartition.isSetUpper_bound_values()); + return tRangePartition; + } + + public List getLowerBound() { return ImmutableList.copyOf(lowerBound_); } + public List getUpperBound() { return ImmutableList.copyOf(upperBound_); } +} diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java index 7f51717df..18043c2c1 100644 --- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java @@ -29,14 +29,15 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.impala.analysis.ToSqlUtils; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Table; import org.apache.impala.catalog.TableNotFoundException; import org.apache.impala.catalog.Type; import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.common.Pair; import org.apache.impala.thrift.TCreateTableParams; import org.apache.impala.thrift.TDistributeParam; +import org.apache.impala.thrift.TRangePartition; import org.apache.impala.util.KuduUtil; import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; import org.apache.kudu.ColumnSchema; @@ -44,6 +45,7 @@ import org.apache.kudu.Schema; import org.apache.kudu.client.CreateTableOptions; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.RangePartitionBound; import org.apache.log4j.Logger; /** @@ -91,7 +93,7 @@ public class KuduCatalogOpExecutor { Set keyColNames = new HashSet<>(params.getPrimary_key_column_names()); List fieldSchemas = msTbl.getSd().getCols(); List colSchemas = new ArrayList<>(fieldSchemas.size()); - for (FieldSchema fieldSchema : fieldSchemas) { + for (FieldSchema fieldSchema: fieldSchemas) { Type type = Type.parseColumnType(fieldSchema.getType()); Preconditions.checkState(type != null); org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type); @@ -117,7 +119,7 @@ public class KuduCatalogOpExecutor { List distributeParams = params.getDistribute_by(); if (distributeParams != null) { boolean hasRangePartitioning = false; - for (TDistributeParam distParam : distributeParams) { + for (TDistributeParam distParam: distributeParams) { if (distParam.isSetBy_hash_param()) { Preconditions.checkState(!distParam.isSetBy_range_param()); tableOpts.addHashPartitions(distParam.getBy_hash_param().getColumns(), @@ -125,11 +127,22 @@ public class KuduCatalogOpExecutor { } else { Preconditions.checkState(distParam.isSetBy_range_param()); hasRangePartitioning = true; - tableOpts.setRangePartitionColumns( - distParam.getBy_range_param().getColumns()); - for (PartialRow partialRow : - KuduUtil.parseSplits(schema, distParam.getBy_range_param())) { - tableOpts.addSplitRow(partialRow); + List rangePartitionColumns = distParam.getBy_range_param().getColumns(); + tableOpts.setRangePartitionColumns(rangePartitionColumns); + for (TRangePartition rangePartition: + distParam.getBy_range_param().getRange_partitions()) { + Preconditions.checkState(rangePartition.isSetLower_bound_values() + || rangePartition.isSetUpper_bound_values()); + Pair lowerBound = + KuduUtil.buildRangePartitionBound(schema, rangePartitionColumns, + rangePartition.getLower_bound_values(), + rangePartition.isIs_lower_bound_inclusive()); + Pair upperBound = + KuduUtil.buildRangePartitionBound(schema, rangePartitionColumns, + rangePartition.getUpper_bound_values(), + rangePartition.isIs_upper_bound_inclusive()); + tableOpts.addRangePartition(lowerBound.first, upperBound.first, + lowerBound.second, upperBound.second); } } } diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index 6e071165e..9ebc48082 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -17,143 +17,104 @@ package org.apache.impala.util; -import java.io.StringReader; import java.util.HashSet; import java.util.List; -import javax.json.Json; -import javax.json.JsonArray; -import javax.json.JsonReader; -import org.apache.impala.catalog.Catalog; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.Type; import org.apache.impala.common.ImpalaRuntimeException; -import org.apache.impala.thrift.TDistributeByRangeParam; -import org.apache.impala.thrift.TRangeLiteral; -import org.apache.impala.thrift.TRangeLiteralList; -import com.google.common.base.Splitter; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import org.apache.impala.common.Pair; +import org.apache.impala.thrift.TExpr; +import org.apache.impala.thrift.TExprNode; + import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.PartialRow; +import org.apache.kudu.client.RangePartitionBound; import static java.lang.String.format; public class KuduUtil { - private static final String SPLIT_KEYS_ERROR_MESSAGE = "Error parsing splits keys."; private static final String KUDU_TABLE_NAME_PREFIX = "impala::"; /** - * Parses split keys from statements. - * - * Split keys are expected to be in json, as an array of arrays, in the form: - * '[[value1_col1, value1_col2, ...], [value2_col1, value2_col2, ...], ...]' - * - * Each inner array corresponds to a split key and should have one matching entry for - * each key column specified in 'schema'. + * Creates a PartialRow from a list of range partition boundary values. */ - public static List parseSplits(Schema schema, String kuduSplits) + private static PartialRow parseRangePartitionBoundaryValues(Schema schema, + List rangePartitionColumns, List boundaryValues) throws ImpalaRuntimeException { - - // If there are no splits return early. - if (kuduSplits == null || kuduSplits.isEmpty()) return ImmutableList.of(); - - ImmutableList.Builder splitRows = ImmutableList.builder(); - - // ...Otherwise parse the splits. We're expecting splits in the format of a list of - // lists of keys. We only support specifying splits for int and string keys - // (currently those are the only type of keys allowed in Kudu too). - try { - JsonReader jr = Json.createReader(new StringReader(kuduSplits)); - JsonArray keysList = jr.readArray(); - for (int i = 0; i < keysList.size(); i++) { - PartialRow splitRow = new PartialRow(schema); - JsonArray compoundKey = keysList.getJsonArray(i); - if (compoundKey.size() != schema.getPrimaryKeyColumnCount()) { - throw new ImpalaRuntimeException(SPLIT_KEYS_ERROR_MESSAGE + - " Wrong number of keys."); - } - for (int j = 0; j < compoundKey.size(); j++) { - setKey(schema.getColumnByIndex(j).getType(), compoundKey, j, splitRow); - } - splitRows.add(splitRow); - } - } catch (ImpalaRuntimeException e) { - throw e; - } catch (Exception e) { - throw new ImpalaRuntimeException(SPLIT_KEYS_ERROR_MESSAGE + " Problem parsing json" - + ": " + e.getMessage(), e); + Preconditions.checkState(rangePartitionColumns.size() == boundaryValues.size()); + PartialRow bound = new PartialRow(schema); + for (int i = 0; i < boundaryValues.size(); ++i) { + String colName = rangePartitionColumns.get(i); + ColumnSchema col = schema.getColumn(colName); + Preconditions.checkNotNull(col); + setKey(col.getType(), boundaryValues.get(i), schema.getColumnIndex(colName), + colName, bound); } - - return splitRows.build(); + return bound; } /** - * Given the TDistributeByRangeParam from the CREATE statement, creates the - * appropriate split rows. + * Builds and returns a range-partition bound used in the creation of a Kudu + * table. The range-partition bound consists of a PartialRow with the boundary + * values and a RangePartitionBound indicating if the bound is inclusive or exclusive. + * Throws an ImpalaRuntimeException if an error occurs while parsing the boundary + * values. */ - public static List parseSplits(Schema schema, - TDistributeByRangeParam param) throws ImpalaRuntimeException { - ImmutableList.Builder splitRows = ImmutableList.builder(); - for (TRangeLiteralList literals : param.getSplit_rows()) { - PartialRow splitRow = new PartialRow(schema); - List literalValues = literals.getValues(); - for (int i = 0; i < literalValues.size(); ++i) { - String colName = param.getColumns().get(i); - ColumnSchema col = schema.getColumn(colName); - setKey(col.getType(), literalValues.get(i), schema.getColumnIndex(colName), - colName, splitRow); - } - splitRows.add(splitRow); + public static Pair buildRangePartitionBound( + Schema schema, List rangePartitionColumns, List boundaryValues, + boolean isInclusiveBound) throws ImpalaRuntimeException { + if (boundaryValues == null || boundaryValues.isEmpty()) { + // TODO: Do we need to set the bound type? + return new Pair(new PartialRow(schema), + RangePartitionBound.INCLUSIVE_BOUND); } - return splitRows.build(); + PartialRow bound = + parseRangePartitionBoundaryValues(schema, rangePartitionColumns, boundaryValues); + RangePartitionBound boundType = null; + if (isInclusiveBound) { + boundType = RangePartitionBound.INCLUSIVE_BOUND; + } else { + boundType = RangePartitionBound.EXCLUSIVE_BOUND; + } + return new Pair(bound, boundType); } /** - * Sets the value in 'key' at 'pos', given the json representation. + * Sets the value 'boundaryVal' in 'key' at 'pos'. Checks if 'boundaryVal' has the + * expected data type. */ - private static void setKey(org.apache.kudu.Type type, JsonArray array, int pos, - PartialRow key) throws ImpalaRuntimeException { - switch (type) { - case INT8: key.addByte(pos, (byte) array.getInt(pos)); break; - case INT16: key.addShort(pos, (short) array.getInt(pos)); break; - case INT32: key.addInt(pos, array.getInt(pos)); break; - case INT64: key.addLong(pos, array.getJsonNumber(pos).longValue()); break; - case STRING: key.addString(pos, array.getString(pos)); break; - default: - throw new ImpalaRuntimeException("Key columns not supported for type: " - + type.toString()); - } - } - - /** - * Sets the value in 'key' at 'pos', given the range literal. - */ - private static void setKey(org.apache.kudu.Type type, TRangeLiteral literal, int pos, + private static void setKey(org.apache.kudu.Type type, TExpr boundaryVal, int pos, String colName, PartialRow key) throws ImpalaRuntimeException { + Preconditions.checkState(boundaryVal.getNodes().size() == 1); + TExprNode literal = boundaryVal.getNodes().get(0); switch (type) { case INT8: checkCorrectType(literal.isSetInt_literal(), type, colName, literal); - key.addByte(pos, (byte) literal.getInt_literal()); + key.addByte(pos, (byte) literal.getInt_literal().getValue()); break; case INT16: checkCorrectType(literal.isSetInt_literal(), type, colName, literal); - key.addShort(pos, (short) literal.getInt_literal()); + key.addShort(pos, (short) literal.getInt_literal().getValue()); break; case INT32: checkCorrectType(literal.isSetInt_literal(), type, colName, literal); - key.addInt(pos, (int) literal.getInt_literal()); + key.addInt(pos, (int) literal.getInt_literal().getValue()); break; case INT64: checkCorrectType(literal.isSetInt_literal(), type, colName, literal); - key.addLong(pos, literal.getInt_literal()); + key.addLong(pos, literal.getInt_literal().getValue()); break; case STRING: checkCorrectType(literal.isSetString_literal(), type, colName, literal); - key.addString(pos, literal.getString_literal()); + key.addString(pos, literal.getString_literal().getValue()); break; default: throw new ImpalaRuntimeException("Key columns not supported for type: " @@ -166,11 +127,11 @@ public class KuduUtil { * indicating problems with the type of the literal of the range literal. */ private static void checkCorrectType(boolean correctType, org.apache.kudu.Type t, - String colName, TRangeLiteral literal) throws ImpalaRuntimeException { + String colName, TExprNode boundaryVal) throws ImpalaRuntimeException { if (correctType) return; throw new ImpalaRuntimeException( - format("Expected %s literal for column '%s' got '%s'", t.getName(), colName, - toString(literal))); + format("Expected '%s' literal for column '%s' got '%s'", t.getName(), colName, + Type.fromThrift(boundaryVal.getType()).toSql())); } /** @@ -250,13 +211,4 @@ public class KuduUtil { "Kudu type '%s' is not supported in Impala", t.getName())); } } - - /** - * Returns the string value of the RANGE literal. - */ - static String toString(TRangeLiteral l) throws ImpalaRuntimeException { - if (l.isSetString_literal()) return String.valueOf(l.string_literal); - if (l.isSetInt_literal()) return String.valueOf(l.int_literal); - throw new ImpalaRuntimeException("Unsupported type for RANGE literal."); - } } diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index 1f718d58e..b00592492 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -1349,6 +1349,16 @@ public class AnalyzeDDLTest extends FrontendTestBase { " stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " + "bigint_col, float_col, double_col, date_string_col, string_col " + "from functional.alltypestiny"); + AnalyzesOk("create table t primary key (id) distribute by range (id) " + + "(partition values < 10, partition 20 <= values < 30, partition value = 50) " + + "stored as kudu as select id, bool_col, tinyint_col, smallint_col, int_col, " + + "bigint_col, float_col, double_col, date_string_col, string_col " + + "from functional.alltypestiny"); + AnalyzesOk("create table t primary key (id) distribute by hash (id) into 3 buckets, "+ + "range (id) (partition values < 10, partition 10 <= values < 20, " + + "partition value = 30) stored as kudu as select id, bool_col, tinyint_col, " + + "smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, " + + "string_col from functional.alltypestiny"); // CTAS in an external Kudu table AnalysisError("create external table t stored as kudu " + "tblproperties('kudu.table_name'='t') as select id, int_col from " + @@ -1743,11 +1753,39 @@ public class AnalyzeDDLTest extends FrontendTestBase { "distribute by hash(x) into 8 buckets stored as kudu"); AnalyzesOk("create table tab (x int, y int, primary key(x, y)) " + "distribute by hash(y) into 8 buckets stored as kudu"); + AnalyzesOk("create table tab (x int, y string, primary key (x)) distribute by " + + "hash (x) into 3 buckets, range (x) (partition values < 1, partition " + + "1 <= values < 10, partition 10 <= values < 20, partition value = 30) " + + "stored as kudu"); + AnalyzesOk("create table tab (x int, y int, primary key (x, y)) distribute by " + + "range (x, y) (partition value = (2001, 1), partition value = (2002, 1), " + + "partition value = (2003, 2)) stored as kudu"); + // Non-literal boundary values in range partitions + AnalyzesOk("create table tab (x int, y int, primary key (x)) distribute by " + + "range (x) (partition values < 1 + 1, partition (1+3) + 2 < values < 10, " + + "partition factorial(4) < values < factorial(5), " + + "partition value = factorial(6)) stored as kudu"); + AnalyzesOk("create table tab (x int, y int, primary key(x, y)) distribute by " + + "range(x, y) (partition value = (1+1, 2+2), partition value = ((1+1+1)+1, 10), " + + "partition value = (cast (30 as int), factorial(5))) stored as kudu"); + AnalysisError("create table tab (x int primary key) distribute by range (x) " + + "(partition values < x + 1) stored as kudu", "Only constant values are allowed " + + "for range-partition bounds: x + 1"); + AnalysisError("create table tab (x int primary key) distribute by range (x) " + + "(partition values <= isnull(null, null)) stored as kudu", "Range partition " + + "values cannot be NULL. Range partition: 'PARTITION VALUES <= " + + "isnull(NULL, NULL)'"); + AnalysisError("create table tab (x int primary key) distribute by range (x) " + + "(partition values <= (select count(*) from functional.alltypestiny)) " + + "stored as kudu", "Only constant values are allowed for range-partition " + + "bounds: (SELECT count(*) FROM functional.alltypestiny)"); // Multilevel partitioning. Data is split into 3 buckets based on 'x' and each - // bucket is partitioned into 4 tablets based on the split points of 'y'. + // bucket is partitioned into 4 tablets based on the range partitions of 'y'. AnalyzesOk("create table tab (x int, y string, primary key(x, y)) " + - "distribute by hash(x) into 3 buckets, range(y) split rows " + - "(('aa'), ('bb'), ('cc')) stored as kudu"); + "distribute by hash(x) into 3 buckets, range(y) " + + "(partition values < 'aa', partition 'aa' <= values < 'bb', " + + "partition 'bb' <= values < 'cc', partition 'cc' <= values) " + + "stored as kudu"); // Key column in upper case AnalyzesOk("create table tab (x int, y int, primary key (X)) " + "distribute by hash (x) into 8 buckets stored as kudu"); @@ -1762,15 +1800,19 @@ public class AnalyzeDDLTest extends FrontendTestBase { // Column names in distribute params should also be case-insensitive. AnalyzesOk("create table tab (a int, b int, c int, d int, primary key(a, b, c, d))" + "distribute by hash (a, B, c) into 8 buckets, " + - "range (A) split rows ((1),(2),(3)) stored as kudu"); + "range (A) (partition values < 1, partition 1 <= values < 2, " + + "partition 2 <= values < 3, partition 3 <= values < 4, partition 4 <= values) " + + "stored as kudu"); // Allowing range distribution on a subset of the primary keys AnalyzesOk("create table tab (id int, name string, valf float, vali bigint, " + - "primary key (id, name)) distribute by range (name) split rows (('abc')) " + - "stored as kudu"); - // Null values in SPLIT ROWS + "primary key (id, name)) distribute by range (name) " + + "(partition 'aa' < values <= 'bb') stored as kudu"); + // Null values in range partition values AnalysisError("create table tab (id int, name string, primary key(id, name)) " + - "distribute by hash (id) into 3 buckets, range (name) split rows ((null),(1)) " + - "stored as kudu", "Split values cannot be NULL. Split row: (NULL)"); + "distribute by hash (id) into 3 buckets, range (name) " + + "(partition value = null, partition value = 1) stored as kudu", + "Range partition values cannot be NULL. Range partition: 'PARTITION " + + "VALUE = NULL'"); // Primary key specified in tblproperties AnalysisError(String.format("create table tab (x int) distribute by hash (x) " + "into 8 buckets stored as kudu tblproperties ('%s' = 'x')", @@ -1793,32 +1835,53 @@ public class AnalyzeDDLTest extends FrontendTestBase { AnalysisError("create table tab (x int, primary key (x, x)) distribute by hash (x) " + "into 8 buckets stored as kudu", "Column 'x' is listed multiple times as a PRIMARY KEY."); - // Each split row size should equals to the number of range columns. + // Number of range partition boundary values should be equal to the number of range + // columns. AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " + - "distribute by range(a) split rows ((1,'extra_val'),(2),(3)) stored as kudu", - "SPLIT ROWS has different size than number of projected key columns: 1. " + - "Split row: (1, 'extra_val')"); + "distribute by range(a) (partition value = (1, 2), " + + "partition value = 3, partition value = 4) stored as kudu", + "Number of specified range partition values is different than the number of " + + "distribution columns: (2 vs 1). Range partition: 'PARTITION VALUE = (1,2)'"); // Key ranges must match the column types. AnalysisError("create table tab (a int, b int, c int, d int, primary key(a, b, c)) " + - "distribute by hash (a, b, c) into 8 buckets, " + - "range (a) split rows ((1), ('abc'), (3)) stored as kudu", - "Split value 'abc' (type: STRING) is not type compatible with column 'a'" + - " (type: INT)."); + "distribute by hash (a, b, c) into 8 buckets, range (a) " + + "(partition value = 1, partition value = 'abc', partition 3 <= values) " + + "stored as kudu", "Range partition value 'abc' (type: STRING) is not type " + + "compatible with distribution column 'a' (type: INT)."); + AnalysisError("create table tab (a tinyint primary key) distribute by range (a) " + + "(partition value = 128) stored as kudu", "Range partition value 128 " + + "(type: SMALLINT) is not type compatible with distribution column 'a' " + + "(type: TINYINT)"); + AnalysisError("create table tab (a smallint primary key) distribute by range (a) " + + "(partition value = 32768) stored as kudu", "Range partition value 32768 " + + "(type: INT) is not type compatible with distribution column 'a' " + + "(type: SMALLINT)"); + AnalysisError("create table tab (a int primary key) distribute by range (a) " + + "(partition value = 2147483648) stored as kudu", "Range partition value " + + "2147483648 (type: BIGINT) is not type compatible with distribution column 'a' " + + "(type: INT)"); + AnalysisError("create table tab (a bigint primary key) distribute by range (a) " + + "(partition value = 9223372036854775808) stored as kudu", "Range partition " + + "value 9223372036854775808 (type: DECIMAL(19,0)) is not type compatible with " + + "distribution column 'a' (type: BIGINT)"); + // Test implicit casting/folding of partition values. + AnalyzesOk("create table tab (a int primary key) distribute by range (a) " + + "(partition value = false, partition value = true) stored as kudu"); // Non-key column used in DISTRIBUTE BY AnalysisError("create table tab (a int, b string, c bigint, primary key (a)) " + - "distribute by range (b) split rows (('abc')) stored as kudu", - "Column 'b' in 'RANGE (b) SPLIT ROWS (('abc'))' is not a key column. " + + "distribute by range (b) (partition value = 'abc') stored as kudu", + "Column 'b' in 'RANGE (b) (PARTITION VALUE = 'abc')' is not a key column. " + "Only key columns can be used in DISTRIBUTE BY."); - // No float split keys + // No float range partition values AnalysisError("create table tab (a int, b int, c int, d int, primary key (a, b, c))" + "distribute by hash (a, b, c) into 8 buckets, " + - "range (a) split rows ((1.2), ('abc'), (3)) stored as kudu", - "Split value 1.2 (type: DECIMAL(2,1)) is not type compatible with column 'a' " + - "(type: INT)."); + "range (a) (partition value = 1.2, partition value = 2) stored as kudu", + "Range partition value 1.2 (type: DECIMAL(2,1)) is not type compatible with " + + "distribution column 'a' (type: INT)."); // Non-existing column used in DISTRIBUTE BY AnalysisError("create table tab (a int, b int, primary key (a, b)) " + - "distribute by range(unknown_column) split rows (('abc')) stored as kudu", - "Column 'unknown_column' in 'RANGE (unknown_column) SPLIT ROWS (('abc'))' " + + "distribute by range(unknown_column) (partition value = 'abc') stored as kudu", + "Column 'unknown_column' in 'RANGE (unknown_column) (PARTITION VALUE = 'abc')' " + "is not a key column. Only key columns can be used in DISTRIBUTE BY"); // Kudu table name is specified in tblproperties AnalyzesOk("create table tab (x int primary key) distribute by hash (x) " + @@ -1827,8 +1890,9 @@ public class AnalyzeDDLTest extends FrontendTestBase { "'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081')"); // No port is specified in kudu master address AnalyzesOk("create table tdata_no_port (id int primary key, name string, " + - "valf float, vali bigint) DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) " + - "STORED AS KUDU tblproperties('kudu.master_addresses'='127.0.0.1')"); + "valf float, vali bigint) distribute by range(id) (partition values <= 10, " + + "partition 10 < values <= 30, partition 30 < values) " + + "stored as kudu tblproperties('kudu.master_addresses'='127.0.0.1')"); // Not using the STORED AS KUDU syntax to specify a Kudu table AnalysisError("create table tab (x int primary key) tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler')", diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index 6dab5f38b..c163d7089 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -2381,34 +2381,46 @@ public class ParserTest { ParsesOk("CREATE TABLE Foo (i int, k int) DISTRIBUTE BY HASH INTO 4 BUCKETS," + " HASH(k) INTO 4 BUCKETS"); ParserError("CREATE TABLE Foo (i int) DISTRIBUTE BY HASH(i)"); - - // Range partitioning, the split rows are not validated in the parser - ParsesOk("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE(i) " + - "SPLIT ROWS ((1, 2.0, 'asdas'))"); - ParsesOk("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE " + - "SPLIT ROWS ((1, 2.0, 'asdas'))"); - - ParsesOk("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE(i) " + - "SPLIT ROWS (('asdas'))"); - - ParsesOk("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE(i) " + - "SPLIT ROWS ((1, 2.0, 'asdas'), (2,3.0, 'adas'))"); - - ParserError("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE(i) " + - "SPLIT ROWS ()"); - ParserError("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE(i)"); ParserError("CREATE EXTERNAL TABLE Foo DISTRIBUTE BY HASH INTO 4 BUCKETS"); - // Combine both - ParsesOk("CREATE TABLE Foo (i int) DISTRIBUTE BY HASH(i) INTO 4 BUCKETS, RANGE(i) " + - "SPLIT ROWS ((1, 2.0, 'asdas'))"); + // Range partitioning + ParsesOk("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE (PARTITION VALUE = 10)"); + ParsesOk("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE(i) " + + "(PARTITION 1 <= VALUES < 10, PARTITION 10 <= VALUES < 20, " + + "PARTITION 21 < VALUES <= 30, PARTITION VALUE = 50)"); + ParsesOk("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE(a) " + + "(PARTITION 10 <= VALUES)"); + ParsesOk("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE(a) " + + "(PARTITION VALUES < 10)"); + ParsesOk("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE (a) " + + "(PARTITION VALUE = 10, PARTITION VALUE = 20)"); + ParsesOk("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE(a) " + + "(PARTITION VALUES <= 10, PARTITION VALUE = 20)"); + ParsesOk("CREATE TABLE Foo (a int, b int) DISTRIBUTE BY RANGE(a, b) " + + "(PARTITION VALUE = (2001, 1), PARTITION VALUE = (2001, 2), " + + "PARTITION VALUE = (2002, 1))"); + ParsesOk("CREATE TABLE Foo (a int, b string) DISTRIBUTE BY " + + "HASH (a) INTO 3 BUCKETS, RANGE (a, b) (PARTITION VALUE = (1, 'abc'), " + + "PARTITION VALUE = (2, 'def'))"); + ParsesOk("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE (a) " + + "(PARTITION VALUE = 1 + 1) STORED AS KUDU"); + ParsesOk("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE (a) " + + "(PARTITION 1 + 1 < VALUES) STORED AS KUDU"); + ParsesOk("CREATE TABLE Foo (a int, b int) DISTRIBUTE BY RANGE (a) " + + "(PARTITION b < VALUES <= a) STORED AS KUDU"); + ParsesOk("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE (a) " + + "(PARTITION now() <= VALUES, PARTITION VALUE = add_months(now(), 2)) " + + "STORED AS KUDU"); - // Can only have one range clause - ParserError("CREATE TABLE Foo (i int) DISTRIBUTE BY HASH(i) INTO 4 BUCKETS, RANGE(i) " + - "SPLIT ROWS ((1, 2.0, 'asdas')), RANGE(i) SPLIT ROWS ((1, 2.0, 'asdas'))"); - // Range needs to be the last DISTRIBUTE BY clause - ParserError("CREATE TABLE Foo (i int) DISTRIBUTE BY RANGE(i) SPLIT ROWS ((1),(2)), " + - "HASH (i) INTO 3 BUCKETS"); + ParserError("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE (a) ()"); + ParserError("CREATE TABLE Foo (a int) DISTRIBUTE BY HASH (a) INTO 4 BUCKETS, " + + "RANGE (a) (PARTITION VALUE = 10), RANGE (a) (PARTITION VALUES < 10)"); + ParserError("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE (a) " + + "(PARTITION VALUE = 10), HASH (a) INTO 3 BUCKETS"); + ParserError("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE (a) " + + "(PARTITION VALUES = 10) STORED AS KUDU"); + ParserError("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE (a) " + + "(PARTITION 10 < VALUE < 20) STORED AS KUDU"); } @Test @@ -2550,10 +2562,6 @@ public class ParserTest { "AS SELECT * from bar"); ParsesOk("CREATE TABLE Foo PRIMARY KEY (a, b) DISTRIBUTE BY HASH (b) INTO 2 " + "BUCKETS AS SELECT * from bar"); - ParsesOk("CREATE TABLE Foo PRIMARY KEY (a, b) DISTRIBUTE BY RANGE (b) SPLIT ROWS " + - "(('foo'), ('bar')) STORED AS KUDU AS SELECT * from bar"); - ParsesOk("CREATE TABLE Foo PRIMARY KEY (a, b) DISTRIBUTE BY RANGE SPLIT ROWS " + - "(('foo'), ('bar')) STORED AS KUDU AS SELECT * from bar"); // With clause works ParsesOk("CREATE TABLE Foo AS with t1 as (select 1) select * from t1"); @@ -2586,6 +2594,9 @@ public class ParserTest { ParserError("CREATE TABLE Foo DISTRIBUTE BY HASH(i) INTO 4 BUCKETS AS SELECT 1"); ParsesOk("CREATE TABLE Foo PRIMARY KEY (a) DISTRIBUTE BY HASH(a) INTO 4 BUCKETS " + "TBLPROPERTIES ('a'='b', 'c'='d') AS SELECT * from bar"); + ParsesOk("CREATE TABLE Foo PRIMARY KEY (a) DISTRIBUTE BY RANGE(a) " + + "(PARTITION 1 < VALUES < 10, PARTITION 10 <= VALUES < 20, PARTITION VALUE = 30) " + + "STORED AS KUDU AS SELECT * FROM Bar"); } @Test diff --git a/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java b/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java deleted file mode 100644 index 3827d72f7..000000000 --- a/fe/src/test/java/org/apache/impala/util/KuduUtilTest.java +++ /dev/null @@ -1,120 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.impala.util; - -import java.util.List; - -import org.apache.impala.common.ImpalaRuntimeException; -import com.google.common.collect.ImmutableList; -import org.junit.Test; -import org.apache.kudu.ColumnSchema; -import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; -import org.apache.kudu.Schema; -import org.apache.kudu.Type; -import org.apache.kudu.client.PartialRow; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -public class KuduUtilTest { - - private ColumnSchema newKeyColumn(String name, Type type) { - ColumnSchemaBuilder csb = new ColumnSchemaBuilder(name, type); - csb.key(true); - csb.nullable(false); - return csb.build(); - } - - // Tests that we're able to parse splits for a single key of 'string' type. - // TODO: this test doesn't make assertions on the resulting encoded key as that - // api is not public in Kudu. Maybe we should make it public? It's hard to test - // otherwise. - @Test - public void testParseSplitKeysSingleKeyStringColumn() throws ImpalaRuntimeException { - String json = "[[\"a\"],[\"b\"]]"; - List keyCols = ImmutableList.of(newKeyColumn("col1", Type.STRING)); - Schema schema = new Schema(keyCols); - List splitRows = KuduUtil.parseSplits(schema.getRowKeyProjection(), json); - assertEquals(splitRows.size(), 2); - } - - // Tests that we're able to parse splits for a single key of 'bigint' type. - // TODO: this test doesn't make assertions on the resulting encoded key as that - // api is not public in Kudu. Maybe we should make it public? It's hard to test - // otherwise. - @Test - public void testParseSplitKeysSingleKeyIntColumn() throws ImpalaRuntimeException { - String json = "[[10], [100]]"; - List keyCols = ImmutableList.of(newKeyColumn("col1", Type.INT64)); - Schema schema = new Schema(keyCols); - List splitRows = KuduUtil.parseSplits(schema.getRowKeyProjection(), json); - assertEquals(splitRows.size(), 2); - } - - // Tests that we're able to parse splits for a compound key of 'string', 'bigint' type. - // TODO: this test doesn't make assertions on the resulting encoded key as that - // api is not public in Kudu. Maybe we should make it public? It's hard to test - // otherwise. - @Test - public void testParseSplitKeysCompoundStringAndIntColumns() - throws ImpalaRuntimeException { - String json = "[[\"a\", 100], [\"b\", 1000]]"; - List keyCols = ImmutableList.of(newKeyColumn("col1", Type.STRING), - newKeyColumn("col2", Type.INT64)); - Schema schema = new Schema(keyCols); - List splitRows = KuduUtil.parseSplits(schema.getRowKeyProjection(), json); - assertEquals(splitRows.size(), 2); - } - - // Tests that we get an exception of the splits keys are of the wrong type, for a single - // key column. - @Test - public void testFailToParseKeysWrongType() { - String json = "[[\"a\"]]"; - List keyCols = ImmutableList.of(newKeyColumn("col1", Type.INT64), - newKeyColumn("col2", Type.INT64)); - Schema schema = new Schema(keyCols); - Exception caught = null; - try { - KuduUtil.parseSplits(schema.getRowKeyProjection(), json); - } catch (Exception e) { - caught = e; - } - assertNotNull(caught); - assertTrue(caught instanceof ImpalaRuntimeException); - } - - // Tests that we get an exception of the splits keys have the wrong number of - // keys. - @Test - public void testFailToParseKeysWrongNumber() { - String json = "[[\"a\", 10]]"; - List keyCols = ImmutableList.of(newKeyColumn("col1", Type.INT64), - newKeyColumn("col2", Type.INT64)); - Schema schema = new Schema(keyCols); - Exception caught = null; - try { - KuduUtil.parseSplits(schema.getRowKeyProjection(), json); - } catch (Exception e) { - caught = e; - } - assertNotNull(caught); - assertTrue(caught instanceof ImpalaRuntimeException); - } -} diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index bfe8f39bc..bed471309 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -766,7 +766,8 @@ create table {db_name}{db_suffix}.{table_name} ( name string, zip int ) -distribute by range(id) split rows ((1003), (1007)) stored as kudu; +distribute by range(id) (partition values <= 1003, partition 1003 < values <= 1007, +partition 1007 < values) stored as kudu; ==== ---- DATASET functional @@ -789,7 +790,8 @@ create table {db_name}{db_suffix}.{table_name} ( name string, zip int ) -distribute by range(id) split rows ((1003), (1007)) stored as kudu; +distribute by range(id) (partition values <= 1003, partition 1003 < values <= 1007, +partition 1007 < values) stored as kudu; ==== ---- DATASET functional @@ -815,7 +817,8 @@ create table {db_name}{db_suffix}.{table_name} ( alltypes_id int, primary key (test_id, test_name, test_zip, alltypes_id) ) -distribute by range(test_id) split rows ((1003), (1007)) stored as kudu; +distribute by range(test_id) (partition values <= 1003, partition 1003 < values <= 1007, +partition 1007 < values) stored as kudu; ==== ---- DATASET functional @@ -1261,7 +1264,8 @@ create table {db_name}{db_suffix}.{table_name} ( a string primary key, b string ) -distribute by range(a) split rows (('b'), ('d')) stored as kudu; +distribute by range(a) (partition values <= 'b', partition 'b' < values <= 'd', +partition 'd' < values) stored as kudu; ==== ---- DATASET functional @@ -1280,7 +1284,9 @@ DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}; create table {db_name}{db_suffix}.{table_name} ( int_col int primary key ) -distribute by range(int_col) split rows ((2), (4), (6), (8)) stored as kudu; +distribute by range(int_col) (partition values <= 2, partition 2 < values <= 4, +partition 4 < values <= 6, partition 6 < values <= 8, partition 8 < values) +stored as kudu; ==== ---- DATASET functional @@ -1405,14 +1411,17 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/ImpalaDemoDataset/DEC_00_SF3_P077 ---- CREATE_KUDU DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}; create table {db_name}{db_suffix}.{table_name} ( - id string, + id string primary key, zip string, description1 string, description2 string, - income int, - primary key (id, zip)) -distribute by range(id, zip) split rows (('8600000US01475', '01475'), ('8600000US63121', '63121'), ('8600000US84712', '84712')) -stored as kudu; + income int) +distribute by range(id) +(partition values <= '8600000US01475', + partition '8600000US01475' < values <= '8600000US63121', + partition '8600000US63121' < values <= '8600000US84712', + partition '8600000US84712' < values +) stored as kudu; ==== ---- DATASET functional diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test index 78a01df26..a65634141 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test @@ -5,9 +5,9 @@ PLAN-ROOT SINK 00:SCAN KUDU [functional_kudu.testtbl] ---- SCANRANGELOCATIONS NODE 0: - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1003), (int64 id=1007))} - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1007), )} - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1003))} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1004), (int64 id=1008))} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1008), )} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1004))} ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -23,9 +23,9 @@ PLAN-ROOT SINK kudu predicates: name = '10' ---- SCANRANGELOCATIONS NODE 0: - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1003), (int64 id=1007))} - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1007), )} - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1003))} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1004), (int64 id=1008))} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1008), )} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1004))} ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -111,7 +111,7 @@ PLAN-ROOT SINK kudu predicates: id <= 20, zip <= 30, id >= 10, zip < 50, zip <= 5, zip > 1, zip >= 0, name = 'foo' ---- SCANRANGELOCATIONS NODE 0: - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1003))} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1004))} ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -131,7 +131,7 @@ PLAN-ROOT SINK kudu predicates: id <= 60, id < 40, id < 103 ---- SCANRANGELOCATIONS NODE 0: - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1003))} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1004))} ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -152,9 +152,9 @@ PLAN-ROOT SINK kudu predicates: name = 'a' ---- SCANRANGELOCATIONS NODE 0: - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1003), (int64 id=1007))} - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1007), )} - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1003))} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1004), (int64 id=1008))} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1008), )} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1004))} ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -175,9 +175,9 @@ PLAN-ROOT SINK predicates: name IS NULL, CAST(sin(id) AS BOOLEAN) = TRUE ---- SCANRANGELOCATIONS NODE 0: - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1003), (int64 id=1007))} - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1007), )} - ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1003))} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1004), (int64 id=1008))} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [(int64 id=1008), )} + ScanToken{table=impala::functional_kudu.testtbl, range-partition: [, (int64 id=1004))} ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test index b2c56c46f..3d9916d1a 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_create.test @@ -62,8 +62,8 @@ NonRecoverableException: hash bucket schema components must not contain columns ---- QUERY # Kudu table that uses Impala keywords as table name and column names create table `add`(`analytic` int, `function` int, primary key(`analytic`, `function`)) -distribute by hash (`analytic`) into 4 buckets, range (`function`) split rows ((1), (10)) -stored as kudu; +distribute by hash (`analytic`) into 4 buckets, range (`function`) +(partition values <= 1, partition 1 < values <= 10, partition 10 < values) stored as kudu; insert into `add` select id, int_col from functional.alltypestiny; select * from `add` ---- RESULTS @@ -78,3 +78,9 @@ select * from `add` ---- TYPES INT,INT ==== +---- QUERY +# Test implicit casting/folding of partition values. +create table tab (a int primary key) distribute by range (a) (partition value = false) +stored as kudu +---- RESULTS +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test index a2f959954..278290802 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test @@ -2,7 +2,7 @@ ---- QUERY -- Invalid hostname create table tdata_bogus_host (id int primary key, name string, valf float, vali bigint) - DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) STORED AS KUDU + DISTRIBUTE BY RANGE (PARTITION 10 <= VALUES <= 30) STORED AS KUDU TBLPROPERTIES('kudu.master_addresses' = 'bogus host name') ---- CATCH Couldn't resolve this master's address bogus host name:7051 @@ -11,7 +11,7 @@ Couldn't resolve this master's address bogus host name:7051 -- Non-existing host create table tdata_non_existing_host (id int primary key, name string, valf float, vali bigint) - DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) STORED AS KUDU + DISTRIBUTE BY RANGE (PARTITION 10 <= VALUES <= 30) STORED AS KUDU TBLPROPERTIES('kudu.master_addresses' = 'bogus.host.name') ---- CATCH Couldn't resolve this master's address bogus.host.name:7051 @@ -19,13 +19,14 @@ Couldn't resolve this master's address bogus.host.name:7051 ---- QUERY create table tdata (id int primary key, name string, valf float, vali bigint, valv string, valb boolean) - DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) STORED AS KUDU + DISTRIBUTE BY RANGE (PARTITION VALUES < 10, PARTITION 10 <= VALUES < 30, + PARTITION 30 <= VALUES) STORED AS KUDU ---- RESULTS ==== ---- QUERY insert into tdata values -(1, "martin", 1.0, 232232323, cast('a' as VARCHAR(20)), true), -(2, "david", cast(1.0 as float), 99398493939, cast('b' as CHAR(1)), false), +(1, "martin", 1.0, 232232323, cast('a' as string), true), +(2, "david", cast(1.0 as float), 99398493939, cast('b' as string), false), (3, "todd", cast(1.0 as float), 993393939, "c", true) ---- RESULTS : 3 @@ -52,7 +53,7 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ---- QUERY # Try updating a string col where casting a value that is bigger than the varchar in the # cast. The value gets truncated and stored to the string col. -update tdata set valv=cast('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' as VARCHAR(20)) where id = 1 +update tdata set valv=cast('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' as varchar(20)) where id = 1 ---- RESULTS ---- RUNTIME_PROFILE row_regex: .*NumModifiedRows: 1.* @@ -113,8 +114,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ==== ---- QUERY insert into tdata values -(40, "he", cast(0.0 as float), 43, cast('e' as VARCHAR(20)), false), -(120, "she", cast(0.0 as float), 99, cast('f' as VARCHAR(20)), true) +(40, "he", cast(0.0 as float), 43, cast('e' as string), false), +(120, "she", cast(0.0 as float), 99, cast('f' as string), true) ---- RESULTS : 2 ---- RUNTIME_PROFILE @@ -168,7 +169,7 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ---- QUERY # Make sure we can insert empty strings into string columns and that we can scan them # back. -insert into tdata values (320, '', 2.0, 932, cast('' as VARCHAR(20)), false) +insert into tdata values (320, '', 2.0, 932, cast('' as string), false) ---- RESULTS : 1 ---- RUNTIME_PROFILE @@ -184,7 +185,7 @@ INT,STRING,STRING,BOOLEAN ---- QUERY -- Test that string case is ignored create table ignore_column_case (Id int, NAME string, vAlf float, vali bigint, - primary key (Id, NAME)) DISTRIBUTE BY RANGE SPLIT ROWS ((10, 'b'), (30, 'a')) + primary key (Id, NAME)) DISTRIBUTE BY RANGE (PARTITION VALUE = (1, 'Martin')) STORED AS KUDU ---- RESULTS ==== @@ -204,7 +205,7 @@ INT,STRING,FLOAT,BIGINT ==== ---- QUERY insert into tdata values -(666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) +(666, "The Devil", cast(1.2 as float), 43, cast('z' as string), true) ---- RESULTS : 1 ---- RUNTIME_PROFILE @@ -212,13 +213,13 @@ row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY insert into tdata values -(666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) +(666, "The Devil", cast(1.2 as float), 43, cast('z' as string), true) ---- CATCH Kudu error(s) reported, first error: Already present ==== ---- QUERY insert ignore into tdata values -(666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) +(666, "The Devil", cast(1.2 as float), 43, cast('z' as string), true) ---- RESULTS : 0 ---- RUNTIME_PROFILE @@ -249,7 +250,7 @@ Kudu error(s) reported, first error: Not found: key not found ---- QUERY -- Re-insert the data insert into tdata values -(666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) +(666, "The Devil", cast(1.2 as float), 43, cast('z' as string), true) ---- RESULTS : 1 ==== @@ -289,7 +290,7 @@ TINYINT,BIGINT ==== ---- QUERY CREATE TABLE kudu_test_tbl PRIMARY KEY(id) -DISTRIBUTE BY RANGE(id) SPLIT ROWS ((100000000)) +DISTRIBUTE BY RANGE(id) (PARTITION VALUES < 100, PARTITION 100 <= VALUES <= 10000) STORED AS KUDU AS SELECT * FROM functional_kudu.alltypes WHERE id < 100; ---- RESULTS @@ -334,6 +335,36 @@ DELETE FROM kudu_test_tbl WHERE id > -1; row_regex: .*NumModifiedRows: 7300.* ==== ---- QUERY +# Insert rows that are not covered by any of the existing range partitions +INSERT INTO kudu_test_tbl SELECT cast(id + 10000 as int), bool_col, tinyint_col, + smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, + timestamp_col, year, month +FROM functional_kudu.alltypes +---- CATCH +Kudu error(s) reported, first error: Not found: No tablet covering the requested range partition: NonCoveredRange { lower_bound: (int32 id=10001), upper_bound: () +==== +---- QUERY +# Insert rows that are not covered by any of the existing range partitions +INSERT IGNORE INTO kudu_test_tbl SELECT cast(id + 10000 as int), bool_col, tinyint_col, + smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, + timestamp_col,year, month +FROM functional_kudu.alltypes +---- CATCH +Kudu error(s) reported, first error: Not found: No tablet covering the requested range partition: NonCoveredRange { lower_bound: (int32 id=10001), upper_bound: () +==== +---- QUERY +# Try to delete a row with a primary key value that is not covered by the existing range +# partitions +DELETE FROM kudu_test_tbl WHERE id = 10001 +---- RESULTS +==== +---- QUERY +# Try to update a row with a primary key value that is not covered by the existing range +# partitions +UPDATE kudu_test_tbl SET int_col = 10 WHERE id = 10001 +---- RESULTS +==== +---- QUERY # IMPALA-2521: clustered insert into table. create table impala_2521 (id bigint primary key, name string, zip int) @@ -366,3 +397,18 @@ select * from impala_2521 ---- TYPES BIGINT,STRING,INT ==== +---- QUERY +# Table with all supported types as primary key and distribution columns +create table allkeytypes (i1 tinyint, i2 smallint, i3 int, i4 bigint, name string, + valf float, vald double, primary key (i1, i2, i3, i4, name)) distribute by + hash into 3 buckets, range (partition value = (1,1,1,1,'1'), + partition value = (2,2,2,2,'2'), partition value = (3,3,3,3,'3')) stored as kudu +---- RESULTS +==== +---- QUERY +insert into allkeytypes select cast(id as tinyint), smallint_col, int_col, + cast (bigint_col/10 as bigint), string_col, float_col, double_col + from functional.alltypes where id > 0 and id < 4 +---- RESULTS +: 3 +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test index bd6140760..13eec9dc4 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_partition_ddl.test @@ -1,9 +1,9 @@ ==== ---- QUERY --- Test HASH partitioning +-- Test hash partitioning create table simple_hash (id int, name string, valf float, vali bigint, - PRIMARY KEY (id, name)) DISTRIBUTE BY HASH(id) INTO 4 BUCKETS, - HASH(name) INTO 2 BUCKETS STORED AS KUDU + primary key (id, name)) distribute by hash(id) INTO 4 buckets, + hash(name) INTO 2 buckets stored as kudu ---- RESULTS ==== ---- QUERY @@ -23,10 +23,109 @@ show table stats simple_hash INT,STRING,STRING,STRING,INT ==== ---- QUERY --- Test HASH and RANGE partitioning +-- Test single column range partitioning with bounded and unbounded partitions +create table range_part_bounds (id int, name string, valf float, vali bigint, + primary key (id, name)) distribute by range (id) + (partition values <= 10, partition 10 < values <= 20, partition 20 < values) + stored as kudu +---- RESULTS +==== +---- QUERY +show table stats range_part_bounds +---- LABELS +# Rows,Start Key,Stop Key,Leader Replica,# Replicas +---- RESULTS +-1,'','8000000B',regex:.*?:\d+,3 +-1,'8000000B','80000015',regex:.*?:\d+,3 +-1,'80000015','',regex:.*?:\d+,3 +---- TYPES +INT,STRING,STRING,STRING,INT +==== +---- QUERY +-- Test single column range partitioning with single value partitions +create table range_part_single (id int, name string, valf float, vali bigint, + primary key (id, name)) distribute by range (id) + (partition value = 1, partition value = 10, partition value = 100) + stored as kudu +---- RESULTS +==== +---- QUERY +show table stats range_part_single +---- LABELS +# Rows,Start Key,Stop Key,Leader Replica,# Replicas +---- RESULTS +-1,'80000001','80000002',regex:.*?:\d+,3 +-1,'8000000A','8000000B',regex:.*?:\d+,3 +-1,'80000064','80000065',regex:.*?:\d+,3 +---- TYPES +INT,STRING,STRING,STRING,INT +==== +---- QUERY +-- Test single column range partitioning with bounded, unbounded and single +-- value partitions +create table range_part_multiple_bounds (id int, name string, valf float, + primary key (id, name)) distribute by range (id) + (partition values <= 10, partition 10 < values <= 20, partition 20 < values <= 30, + partition value = 40, partition value = 50) stored as kudu +---- RESULTS +==== +---- QUERY +show table stats range_part_multiple_bounds +---- LABELS +# Rows,Start Key,Stop Key,Leader Replica,# Replicas +---- RESULTS +-1,'','8000000B',regex:.*?:\d+,3 +-1,'8000000B','80000015',regex:.*?:\d+,3 +-1,'80000015','8000001F',regex:.*?:\d+,3 +-1,'80000028','80000029',regex:.*?:\d+,3 +-1,'80000032','80000033',regex:.*?:\d+,3 +---- TYPES +INT,STRING,STRING,STRING,INT +==== +---- QUERY +-- Test multiple column range partitioning +create table range_part_multiple_cols (id int, name string, valf float, vali bigint, + primary key (id, name)) distribute by range (id, name) + (partition value = (10, 'martin'), partition value = (20, 'dimitris'), + partition value = (30, 'matthew')) stored as kudu +---- RESULTS +==== +---- QUERY +show table stats range_part_multiple_cols +---- LABELS +# Rows,Start Key,Stop Key,Leader Replica,# Replicas +---- RESULTS +-1,'8000000A6D617274696E','8000000A6D617274696E00',regex:.*?:\d+,3 +-1,'8000001464696D6974726973','8000001464696D697472697300',regex:.*?:\d+,3 +-1,'8000001E6D617474686577','8000001E6D61747468657700',regex:.*?:\d+,3 +---- TYPES +INT,STRING,STRING,STRING,INT +==== +---- QUERY +-- Test single column range partitioning with string partition column +create table range_part_single_string_col (id int, name string, valf float, + primary key (id, name)) distribute by range(name) + (partition values <= 'aaa', partition 'aaa' < values <= 'bbb', + partition 'bbb' < values <= 'ccc', partition value = 'ddd') stored as kudu +---- RESULTS +==== +---- QUERY +show table stats range_part_single_string_col +---- LABELS +# Rows,Start Key,Stop Key,Leader Replica,# Replicas +---- RESULTS +-1,'','61616100',regex:.*?:\d+,3 +-1,'61616100','62626200',regex:.*?:\d+,3 +-1,'62626200','63636300',regex:.*?:\d+,3 +-1,'646464','64646400',regex:.*?:\d+,3 +---- TYPES +INT,STRING,STRING,STRING,INT +==== +---- QUERY +-- Test hash and range partitioning create table simple_hash_range (id int, name string, valf float, vali bigint, - PRIMARY KEY (id, name)) DISTRIBUTE BY HASH(id) INTO 4 BUCKETS, - RANGE(id, name) SPLIT ROWS ((10, 'martin'), (20, 'Peter')) STORED AS KUDU + primary key (id, name)) distribute by hash(id) into 4 buckets, range(id, name) + (partition value = (10, 'martin'), partition value = (20, 'alex')) stored as kudu ---- RESULTS ==== ---- QUERY @@ -34,25 +133,22 @@ show table stats simple_hash_range ---- LABELS # Rows,Start Key,Stop Key,Leader Replica,# Replicas ---- RESULTS --1,'','000000008000000A6D617274696E',regex:.*?:\d+,3 --1,'000000008000000A6D617274696E','00000000800000145065746572',regex:.*?:\d+,3 --1,'00000000800000145065746572','00000001',regex:.*?:\d+,3 --1,'00000001','000000018000000A6D617274696E',regex:.*?:\d+,3 --1,'000000018000000A6D617274696E','00000001800000145065746572',regex:.*?:\d+,3 --1,'00000001800000145065746572','00000002',regex:.*?:\d+,3 --1,'00000002','000000028000000A6D617274696E',regex:.*?:\d+,3 --1,'000000028000000A6D617274696E','00000002800000145065746572',regex:.*?:\d+,3 --1,'00000002800000145065746572','00000003',regex:.*?:\d+,3 --1,'00000003','000000038000000A6D617274696E',regex:.*?:\d+,3 --1,'000000038000000A6D617274696E','00000003800000145065746572',regex:.*?:\d+,3 --1,'00000003800000145065746572','',regex:.*?:\d+,3 +-1,'000000008000000A6D617274696E','000000008000000A6D617274696E00',regex:.*?:\d+,3 +-1,'0000000080000014616C6578','0000000080000014616C657800',regex:.*?:\d+,3 +-1,'000000018000000A6D617274696E','000000018000000A6D617274696E00',regex:.*?:\d+,3 +-1,'0000000180000014616C6578','0000000180000014616C657800',regex:.*?:\d+,3 +-1,'000000028000000A6D617274696E','000000028000000A6D617274696E00',regex:.*?:\d+,3 +-1,'0000000280000014616C6578','0000000280000014616C657800',regex:.*?:\d+,3 +-1,'000000038000000A6D617274696E','000000038000000A6D617274696E00',regex:.*?:\d+,3 +-1,'0000000380000014616C6578','0000000380000014616C657800',regex:.*?:\d+,3 ---- TYPES INT,STRING,STRING,STRING,INT ==== ---- QUERY create table simple_hash_range_ctas - PRIMARY KEY (id, name) DISTRIBUTE BY HASH(id) INTO 4 BUCKETS, - RANGE(id, name) SPLIT ROWS ((10, 'martin'), (20, 'Peter')) STORED AS KUDU + primary key (id, name) distribute by hash(id) into 4 buckets, + range(id, name) (partition value = (10, 'casey'), partition value = (20, 'marcel')) + stored as kudu as select * from simple_hash ---- RESULTS 'Inserted 0 row(s)' @@ -62,25 +158,21 @@ show table stats simple_hash_range_ctas ---- LABELS # Rows,Start Key,Stop Key,Leader Replica,# Replicas ---- RESULTS --1,'','000000008000000A6D617274696E',regex:.*?:\d+,3 --1,'000000008000000A6D617274696E','00000000800000145065746572',regex:.*?:\d+,3 --1,'00000000800000145065746572','00000001',regex:.*?:\d+,3 --1,'00000001','000000018000000A6D617274696E',regex:.*?:\d+,3 --1,'000000018000000A6D617274696E','00000001800000145065746572',regex:.*?:\d+,3 --1,'00000001800000145065746572','00000002',regex:.*?:\d+,3 --1,'00000002','000000028000000A6D617274696E',regex:.*?:\d+,3 --1,'000000028000000A6D617274696E','00000002800000145065746572',regex:.*?:\d+,3 --1,'00000002800000145065746572','00000003',regex:.*?:\d+,3 --1,'00000003','000000038000000A6D617274696E',regex:.*?:\d+,3 --1,'000000038000000A6D617274696E','00000003800000145065746572',regex:.*?:\d+,3 --1,'00000003800000145065746572','',regex:.*?:\d+,3 +-1,'000000008000000A6361736579','000000008000000A636173657900',regex:.*?:\d+,3 +-1,'00000000800000146D617263656C','00000000800000146D617263656C00',regex:.*?:\d+,3 +-1,'000000018000000A6361736579','000000018000000A636173657900',regex:.*?:\d+,3 +-1,'00000001800000146D617263656C','00000001800000146D617263656C00',regex:.*?:\d+,3 +-1,'000000028000000A6361736579','000000028000000A636173657900',regex:.*?:\d+,3 +-1,'00000002800000146D617263656C','00000002800000146D617263656C00',regex:.*?:\d+,3 +-1,'000000038000000A6361736579','000000038000000A636173657900',regex:.*?:\d+,3 +-1,'00000003800000146D617263656C','00000003800000146D617263656C00',regex:.*?:\d+,3 ---- TYPES INT,STRING,STRING,STRING,INT ==== ---- QUERY --- Test HASH defaults to all columns +-- Test hash defaults to all columns create table simple_hash_all_columns (id int, name string, valf float, vali bigint, - PRIMARY KEY (id, name)) DISTRIBUTE BY HASH INTO 4 BUCKETS STORED AS KUDU + primary key (id, name)) distribute by hash into 4 buckets stored as kudu ---- RESULTS ==== ---- QUERY @@ -96,10 +188,11 @@ show table stats simple_hash_all_columns INT,STRING,STRING,STRING,INT ==== ---- QUERY --- Test RANGE defaults to all columns +-- Test range defaults to all columns create table simple_range_all_columns (id int, name string, valf float, vali bigint, - PRIMARY KEY (id, name)) DISTRIBUTE BY RANGE SPLIT ROWS ((1, 'a'), (2, 'b')) - STORED AS KUDU + primary key (id, name)) distribute by range + (partition value = (1, 'a'), partition value = (2, 'b')) + stored as kudu ---- RESULTS ==== ---- QUERY @@ -107,9 +200,52 @@ show table stats simple_range_all_columns ---- LABELS # Rows,Start Key,Stop Key,Leader Replica,# Replicas ---- RESULTS --1,'','8000000161',regex:.*?:\d+,3 --1,'8000000161','8000000262',regex:.*?:\d+,3 --1,'8000000262','',regex:.*?:\d+,3 +-1,'8000000161','800000016100',regex:.*?:\d+,3 +-1,'8000000262','800000026200',regex:.*?:\d+,3 ---- TYPES INT,STRING,STRING,STRING,INT ==== +---- QUERY +-- Test using non-literal constant values in range-partition bounds +create table range_complex_const_boundary_vals (x int, y int, primary key (x)) + distribute by range (x) (partition values < 1 + 1, partition (1+3) + 2 < values < 10, + partition factorial(4) < values < factorial(5), partition value = factorial(6)) + stored as kudu +---- RESULTS +==== +---- QUERY +show table stats range_complex_const_boundary_vals +---- LABELS +# Rows,Start Key,Stop Key,Leader Replica,# Replicas +---- RESULTS +-1,'','80000002',regex:.*?:\d+,3 +-1,'80000007','8000000A',regex:.*?:\d+,3 +-1,'80000019','80000078',regex:.*?:\d+,3 +-1,'800002D0','800002D1',regex:.*?:\d+,3 +---- TYPES +INT,STRING,STRING,STRING,INT +==== +---- QUERY +-- Test range partitioning with overlapping partitions +create table simple_range_with_overlapping (id int, name string, valf float, vali bigint, + primary key (id, name)) distribute by range (id) + (partition values <= 10, partition values < 20, partition value = 5) stored as kudu +---- CATCH +NonRecoverableException: overlapping range partitions: first range partition: [, (int32 id=11)), second range partition: [, (int32 id=20)) +==== +---- QUERY +-- Test range partitioning with the same partition specified multiple times +create table simple_range_duplicate_parts (id int, name string, valf float, vali bigint, + primary key(id, name)) distribute by range (id) + (partition 10 < values <= 20, partition 10 < values <= 20) stored as kudu +---- CATCH +NonRecoverableException: overlapping range partitions: first range partition: [(int32 id=11), (int32 id=21)), second range partition: [(int32 id=11), (int32 id=21)) +==== +---- QUERY +-- Test multi-column range partitioning with the same partition specified multiple times +create table range_multi_col_duplicate_parts (id int, name string, valf float, + vali bigint, primary key (id, name)) distribute by range (id, name) + (partition value = (10, 'dimitris'), partition value = (10, 'dimitris')) stored as kudu +---- CATCH +NonRecoverableException: overlapping range partitions: first range partition: [(int32 id=10, string name=dimitris), (int32 id=10, string name=dimitris\000)), second range partition: [(int32 id=10, string name=dimitris), (int32 id=10, string name=dimitris\000)) +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test index 589bbf008..691494481 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_stats.test @@ -1,8 +1,8 @@ ==== ---- QUERY create table simple (id int primary key, name string, valf float, vali bigint) - DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) STORED AS KUDU - TBLPROPERTIES('kudu.num_tablet_replicas' = '2') + distribute by range (partition values < 10, partition 10 <= values < 30, + partition 30 <= values) stored as kudu tblproperties('kudu.num_tablet_replicas' = '2') ---- RESULTS ==== ---- QUERY diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py index 56e69645b..2a881370a 100644 --- a/tests/query_test/test_kudu.py +++ b/tests/query_test/test_kudu.py @@ -230,15 +230,16 @@ class TestShowCreateTable(KuduTestSuite): self.assert_show_create_equals(cursor, """ CREATE TABLE {table} (c INT PRIMARY KEY, d STRING) - DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) SPLIT ROWS ((1), (2)) - STORED AS KUDU""", + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) + (PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2, + PARTITION 2 < VALUES) STORED AS KUDU""", """ CREATE TABLE {db}.{{table}} ( c INT, d STRING, PRIMARY KEY (c) ) - DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) SPLIT ROWS (...) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) (...) STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) @@ -259,21 +260,23 @@ class TestShowCreateTable(KuduTestSuite): """ CREATE TABLE {table} (c INT, d STRING, PRIMARY KEY(c, d)) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS, - RANGE (c, d) SPLIT ROWS ((1, 'aaa'), (2, 'bbb')) STORED AS KUDU""", + RANGE (c, d) (PARTITION VALUE = (1, 'aaa'), PARTITION VALUE = (2, 'bbb')) + STORED AS KUDU""", """ CREATE TABLE {db}.{{table}} ( c INT, d STRING, PRIMARY KEY (c, d) ) - DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS, RANGE (c, d) SPLIT ROWS (...) + DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS, RANGE (c, d) (...) STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) self.assert_show_create_equals(cursor, """ CREATE TABLE {table} (c INT, d STRING, e INT, PRIMARY KEY(c, d)) - DISTRIBUTE BY RANGE (c) SPLIT ROWS ((1), (2), (3)) STORED AS KUDU""", + DISTRIBUTE BY RANGE (c) (PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2, + PARTITION 2 < VALUES <= 3, PARTITION 3 < VALUES) STORED AS KUDU""", """ CREATE TABLE {db}.{{table}} ( c INT, @@ -281,7 +284,7 @@ class TestShowCreateTable(KuduTestSuite): e INT, PRIMARY KEY (c, d) ) - DISTRIBUTE BY RANGE (c) SPLIT ROWS (...) + DISTRIBUTE BY RANGE (c) (...) STORED AS KUDU TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format( db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))