diff --git a/be/src/exec/kudu-testutil.h b/be/src/exec/kudu-testutil.h index f748eb8dc..1fd2c419e 100644 --- a/be/src/exec/kudu-testutil.h +++ b/be/src/exec/kudu-testutil.h @@ -104,6 +104,7 @@ class KuduTestHelper { .schema(&test_schema_) .num_replicas(3) .split_rows(splits) + .set_range_partition_columns(boost::assign::list_of("key")) .Create(); if (s.IsAlreadyPresent()) { LOG(INFO) << "Table existed, deleting. " << table_name_; diff --git a/bin/impala-config.sh b/bin/impala-config.sh index 5983e87c4..b1f49c47a 100755 --- a/bin/impala-config.sh +++ b/bin/impala-config.sh @@ -259,7 +259,7 @@ export IMPALA_GFLAGS_VERSION=2.0 export IMPALA_GLOG_VERSION=0.3.2-p2 export IMPALA_GPERFTOOLS_VERSION=2.5 export IMPALA_GTEST_VERSION=1.6.0 -export IMPALA_KUDU_VERSION=0.8.0-RC1 +export IMPALA_KUDU_VERSION=0.10.0-RC1 export IMPALA_LLVM_VERSION=3.8.0-p1 export IMPALA_LLVM_ASAN_VERSION=3.8.0-p1 # Debug builds should use the release+asserts build to get additional coverage. @@ -285,7 +285,7 @@ export KUDU_MASTER_PORT=${KUDU_MASTER_PORT:-"7051"} # TODO: Figure out a way to use a snapshot version without causing a lot of breakage due # to nightly changes from Kudu. The version below is the last released version but # before release this needs to be updated to the version about to be released. -export KUDU_JAVA_VERSION=0.6.0 +export KUDU_JAVA_VERSION=0.10.0-SNAPSHOT if [[ $OSTYPE == "darwin"* ]]; then IMPALA_CYRUS_SASL_VERSION=2.1.26 diff --git a/fe/src/main/java/com/cloudera/impala/analysis/CreateTableStmt.java b/fe/src/main/java/com/cloudera/impala/analysis/CreateTableStmt.java index 7cd03d2a4..f7b683f2f 100644 --- a/fe/src/main/java/com/cloudera/impala/analysis/CreateTableStmt.java +++ b/fe/src/main/java/com/cloudera/impala/analysis/CreateTableStmt.java @@ -24,13 +24,10 @@ import java.util.Set; import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; -import com.cloudera.impala.util.KuduUtil; import org.apache.hadoop.fs.permission.FsAction; import com.cloudera.impala.authorization.Privilege; -import com.cloudera.impala.catalog.HdfsFileFormat; import com.cloudera.impala.catalog.HdfsStorageDescriptor; -import com.cloudera.impala.catalog.HdfsTable; import com.cloudera.impala.catalog.KuduTable; import com.cloudera.impala.catalog.RowFormat; import com.cloudera.impala.common.AnalysisException; @@ -43,6 +40,7 @@ import com.cloudera.impala.thrift.TTableName; import com.cloudera.impala.util.AvroSchemaConverter; import com.cloudera.impala.util.AvroSchemaParser; import com.cloudera.impala.util.AvroSchemaUtils; +import com.cloudera.impala.util.KuduUtil; import com.cloudera.impala.util.MetaStoreUtil; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -376,6 +374,11 @@ public class CreateTableStmt extends StatementBase { } if (distributeParams_ != null) { + if (isExternal_) { + throw new AnalysisException( + "The DISTRIBUTE BY clause may not be specified for external tables."); + } + List keyColumns = KuduUtil.parseKeyColumnsAsList( getTblProperties().get(KuduTable.KEY_KEY_COLUMNS)); for (DistributeParam d : distributeParams_) { @@ -383,6 +386,9 @@ public class CreateTableStmt extends StatementBase { if (d.getColumns() == null) d.setColumns(keyColumns); d.analyze(analyzer); } + } else if (!isExternal_) { + throw new AnalysisException( + "A data distribution must be specified using the DISTRIBUTE BY clause."); } } diff --git a/fe/src/main/java/com/cloudera/impala/catalog/delegates/KuduDdlDelegate.java b/fe/src/main/java/com/cloudera/impala/catalog/delegates/KuduDdlDelegate.java index d4b89ded1..919eebb62 100644 --- a/fe/src/main/java/com/cloudera/impala/catalog/delegates/KuduDdlDelegate.java +++ b/fe/src/main/java/com/cloudera/impala/catalog/delegates/KuduDdlDelegate.java @@ -17,20 +17,17 @@ package com.cloudera.impala.catalog.delegates; +import static com.cloudera.impala.util.KuduUtil.compareSchema; +import static com.cloudera.impala.util.KuduUtil.fromImpalaType; +import static com.cloudera.impala.util.KuduUtil.parseKeyColumns; + import java.util.ArrayList; import java.util.HashSet; import java.util.List; -import com.cloudera.impala.thrift.TDistributeParam; -import com.cloudera.impala.thrift.TDistributeType; -import com.cloudera.impala.util.KuduUtil; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.kududb.ColumnSchema; import org.kududb.ColumnSchema.ColumnSchemaBuilder; import org.kududb.Schema; @@ -38,15 +35,17 @@ import org.kududb.Type; import org.kududb.client.CreateTableOptions; import org.kududb.client.KuduClient; import org.kududb.client.PartialRow; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.cloudera.impala.catalog.KuduTable; import com.cloudera.impala.common.ImpalaRuntimeException; +import com.cloudera.impala.thrift.TDistributeParam; +import com.cloudera.impala.util.KuduUtil; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.collect.Lists; -import static com.cloudera.impala.util.KuduUtil.compareSchema; -import static com.cloudera.impala.util.KuduUtil.fromImpalaType; -import static com.cloudera.impala.util.KuduUtil.parseKeyColumns; - /** * Implementation of the Kudu DDL Delegate. Propagates create and drop table statements to @@ -174,8 +173,6 @@ public class KuduDdlDelegate extends DdlDelegate { } client.deleteTable(kuduTableName); return; - } catch (ImpalaRuntimeException e) { - throw e; } catch (Exception e) { throw new ImpalaRuntimeException("Error dropping Kudu table", e); } diff --git a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java index e89a7e036..6e57ccbe9 100644 --- a/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/com/cloudera/impala/analysis/AnalyzeDDLTest.java @@ -1671,7 +1671,8 @@ public class AnalyzeDDLTest extends FrontendTestBase { public void TestCreateKuduTable() { TestUtils.assumeKuduIsSupported(); // Create Kudu Table with all required properties - AnalyzesOk("create table tab (x int) tblproperties (" + + AnalyzesOk("create table tab (x int) " + + "distribute by hash into 2 buckets tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + "'kudu.table_name'='tab'," + "'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081', " + @@ -1679,7 +1680,8 @@ public class AnalyzeDDLTest extends FrontendTestBase { ")"); // Check that all properties are present - AnalysisError("create table tab (x int) tblproperties (" + + AnalysisError("create table tab (x int) " + + "distribute by hash into 2 buckets tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + "'kudu.master_addresses' = '127.0.0.1:8080', " + "'kudu.key_columns' = 'a,b,c'" + @@ -1688,7 +1690,8 @@ public class AnalyzeDDLTest extends FrontendTestBase { "if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " + "present and have valid values."); - AnalysisError("create table tab (x int) tblproperties (" + + AnalysisError("create table tab (x int) " + + "distribute by hash into 2 buckets tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + "'kudu.table_name'='tab'," + "'kudu.key_columns' = 'a,b,c'" @@ -1697,7 +1700,8 @@ public class AnalyzeDDLTest extends FrontendTestBase { "if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " + "present and have valid values."); - AnalysisError("create table tab (x int) tblproperties (" + + AnalysisError("create table tab (x int) " + + "distribute by hash into 2 buckets tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + "'kudu.table_name'='tab'," + "'kudu.master_addresses' = '127.0.0.1:8080'" + @@ -1707,7 +1711,8 @@ public class AnalyzeDDLTest extends FrontendTestBase { "present and have valid values."); // Check that properties are not empty - AnalysisError("create table tab (x int) tblproperties (" + + AnalysisError("create table tab (x int) " + + "distribute by hash into 2 buckets tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + "'kudu.table_name'=''," + "'kudu.master_addresses' = '127.0.0.1:8080', " + @@ -1717,7 +1722,8 @@ public class AnalyzeDDLTest extends FrontendTestBase { "if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " + "present and have valid values."); - AnalysisError("create table tab (x int) tblproperties (" + + AnalysisError("create table tab (x int) " + + "distribute by hash into 2 buckets tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + "'kudu.table_name'='asd'," + "'kudu.master_addresses' = '', " + @@ -1728,7 +1734,8 @@ public class AnalyzeDDLTest extends FrontendTestBase { "present and have valid values."); // Don't allow caching - AnalysisError("create table tab (x int) cached in 'testPool' tblproperties (" + + AnalysisError("create table tab (x int) cached in 'testPool' " + + "distribute by hash into 2 buckets tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + "'kudu.table_name'='tab'," + "'kudu.master_addresses' = '127.0.0.1:8080', " + @@ -1746,7 +1753,7 @@ public class AnalyzeDDLTest extends FrontendTestBase { ")"); AnalyzesOk("create table tab (a int, b int, c int, d int) " + - " distribute by hash into 8 buckets " + + "distribute by hash into 8 buckets " + "tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + "'kudu.table_name'='tab'," + @@ -1754,9 +1761,28 @@ public class AnalyzeDDLTest extends FrontendTestBase { "'kudu.key_columns' = 'a,b,c'" + ")"); + // DISTRIBUTE BY is required for managed tables. + AnalysisError("create table tab (a int) tblproperties (" + + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + + "'kudu.table_name'='tab'," + + "'kudu.master_addresses' = '127.0.0.1:8080', " + + "'kudu.key_columns' = 'a'" + + ")", + "A data distribution must be specified using the DISTRIBUTE BY clause."); + + // DISTRIBUTE BY is not allowed for external tables. + AnalysisError("create external table tab (a int) " + + "distribute by hash into 3 buckets tblproperties (" + + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + + "'kudu.table_name'='tab'," + + "'kudu.master_addresses' = '127.0.0.1:8080', " + + "'kudu.key_columns' = 'a'" + + ")", + "The DISTRIBUTE BY clause may not be specified for external tables."); + // Number of buckets must be larger 1 AnalysisError("create table tab (a int, b int, c int, d int) " + - " distribute by hash(a,b) into 8 buckets, hash(c) into 1 buckets " + + "distribute by hash(a,b) into 8 buckets, hash(c) into 1 buckets " + "tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " + "'kudu.table_name'='tab'," + diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py index f52ac9151..ab38f5c53 100755 --- a/testdata/bin/generate-schema-statements.py +++ b/testdata/bin/generate-schema-statements.py @@ -221,6 +221,10 @@ def build_table_template(file_format, columns, partition_columns, row_format, elif file_format == 'parquet': row_format_stmt = str() elif file_format == 'kudu': + # Use partitioned_by to set a trivial hash distribution + assert not partitioned_by, "Kudu table shouldn't have partition cols defined" + partitioned_by = "distribute by hash into 3 buckets" + # Fetch KUDU host and port from environment kudu_master = os.getenv("KUDU_MASTER_ADDRESS", "127.0.0.1") kudu_master_port = os.getenv("KUDU_MASTER_PORT", "7051") diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test index 7d139d2c4..35abe35a2 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/kudu.test @@ -3,7 +3,9 @@ select * from functional_kudu.testtbl 00:SCAN KUDU [functional_kudu.testtbl] ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE []:[] + KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] + KUDU KEYRANGE [0, 0, 0, 2]:[] + KUDU KEYRANGE []:[0, 0, 0, 1] ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | @@ -15,7 +17,9 @@ select * from functional_kudu.testtbl where name = '10' kudu predicates: name = '10' ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE []:[] + KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] + KUDU KEYRANGE [0, 0, 0, 2]:[] + KUDU KEYRANGE []:[0, 0, 0, 1] ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | @@ -90,7 +94,9 @@ and zip > 1 and zip < 50 kudu predicates: id >= 10, zip <= 5, id <= 20, zip >= 0, zip <= 30, zip >= 2, zip <= 49, name = 'foo' ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE []:[] + KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] + KUDU KEYRANGE [0, 0, 0, 2]:[] + KUDU KEYRANGE []:[0, 0, 0, 1] ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | @@ -106,7 +112,9 @@ where id < 10 + 30 and cast(sin(id) as boolean) = true and 20 * 3 >= id and 10 kudu predicates: id <= 39, id <= 60, id <= 102 ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE []:[] + KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] + KUDU KEYRANGE [0, 0, 0, 2]:[] + KUDU KEYRANGE []:[0, 0, 0, 1] ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | @@ -123,7 +131,9 @@ where cast(sin(id) as boolean) = true and name = 'a' kudu predicates: name = 'a' ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE []:[] + KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] + KUDU KEYRANGE [0, 0, 0, 2]:[] + KUDU KEYRANGE []:[0, 0, 0, 1] ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | @@ -140,7 +150,9 @@ where cast(sin(id) as boolean) = true and name is null predicates: name IS NULL, CAST(sin(id) AS BOOLEAN) = TRUE ---- SCANRANGELOCATIONS NODE 0: - KUDU KEYRANGE []:[] + KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2] + KUDU KEYRANGE [0, 0, 0, 2]:[] + KUDU KEYRANGE []:[0, 0, 0, 1] ---- DISTRIBUTEDPLAN 01:EXCHANGE [UNPARTITIONED] | diff --git a/testdata/workloads/functional-query/queries/QueryTest/create_kudu.test b/testdata/workloads/functional-query/queries/QueryTest/create_kudu.test index fbf2088b6..835e2737d 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/create_kudu.test +++ b/testdata/workloads/functional-query/queries/QueryTest/create_kudu.test @@ -3,6 +3,7 @@ # Create managed Kudu table create table managed_kudu ( id int, f float, d double, s string, v varchar(10), t tinyint, m smallint ) +distribute by hash into 3 buckets tblproperties ( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test index 089b45ecb..8aa145757 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-scan-node.test @@ -22,6 +22,7 @@ INT, BIGINT # carry over into the next unfiltered row (the result below would incorrectly be 2,NULL). USE kududb_test; CREATE TABLE impala_2740 (key INT, value INT) + DISTRIBUTE BY HASH INTO 3 BUCKETS TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'impala_2740', @@ -40,6 +41,7 @@ INT, INT # is run on all impalads. However, for the t1 table there is only as single scan range, # so two of the scan instances get empty scan ranges. CREATE TABLE impala_2635_t1 (id BIGINT, name STRING) + DISTRIBUTE BY RANGE SPLIT ROWS ((0)) TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'impala_2635_t1', diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu-show-create.test b/testdata/workloads/functional-query/queries/QueryTest/kudu-show-create.test index 60fb110c1..b07efb749 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu-show-create.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu-show-create.test @@ -7,5 +7,10 @@ CREATE TABLE functional_kudu.dimtbl ( name STRING, zip INT ) -TBLPROPERTIES ('kudu.master_addresses'='127.0.0.1:7051', 'kudu.key_columns'='id', 'kudu.table_name'='dimtbl', 'transient_lastDdlTime'='1441325601', 'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler') -==== \ No newline at end of file +TBLPROPERTIES ( + 'kudu.master_addresses'='127.0.0.1:7051', + 'kudu.key_columns'='id', + 'kudu.table_name'='dimtbl', + 'transient_lastDdlTime'='1441325601', + 'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler') +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test index ce0ca1a74..b32e0d060 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_alter.test @@ -1,6 +1,7 @@ ==== ---- QUERY create table simple (id int, name string, valf float, vali bigint) +distribute by hash into 3 buckets TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'query_test_simple', @@ -39,4 +40,4 @@ select count(*) from simple_new; 0 ---- TYPES BIGINT -==== \ No newline at end of file +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test index d58c55ee5..8a583759f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test @@ -261,6 +261,7 @@ delete ignore a from tdata a, tdata b where a.id = 666 # if the Kudu columns are of different types. create table impala_3454 (key_1 tinyint, key_2 bigint) +DISTRIBUTE BY HASH INTO 3 BUCKETS TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'impala_3454', diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py index 2f376e375..d79160880 100644 --- a/tests/query_test/test_kudu.py +++ b/tests/query_test/test_kudu.py @@ -130,6 +130,7 @@ class TestKuduMemLimits(ImpalaTestSuite): l_shipmode STRING, l_comment STRING ) + DISTRIBUTE BY HASH (l_orderkey) INTO 9 BUCKETS TBLPROPERTIES( 'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler', 'kudu.table_name' = 'tpch_lineitem',