IMPALA-3650: DISTRIBUTE BY required for managed Kudu tables

As of Kudu 0.9, DISTRIBUTE BY is now required when creating
a new Kudu table. Create table analysis, data loading, and
tests are updated to reflect this.

This also bumps the Kudu version to 0.10.0.

Change-Id: Ieb15110b10b28ef6dd8ec136c2522b5f44dca43e
Reviewed-on: http://gerrit.cloudera.org:8080/3987
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Internal Jenkins
This commit is contained in:
Matthew Jacobs
2016-08-17 11:28:49 -07:00
committed by Internal Jenkins
parent 0849827147
commit d113205cee
13 changed files with 93 additions and 36 deletions

View File

@@ -104,6 +104,7 @@ class KuduTestHelper {
.schema(&test_schema_)
.num_replicas(3)
.split_rows(splits)
.set_range_partition_columns(boost::assign::list_of("key"))
.Create();
if (s.IsAlreadyPresent()) {
LOG(INFO) << "Table existed, deleting. " << table_name_;

View File

@@ -259,7 +259,7 @@ export IMPALA_GFLAGS_VERSION=2.0
export IMPALA_GLOG_VERSION=0.3.2-p2
export IMPALA_GPERFTOOLS_VERSION=2.5
export IMPALA_GTEST_VERSION=1.6.0
export IMPALA_KUDU_VERSION=0.8.0-RC1
export IMPALA_KUDU_VERSION=0.10.0-RC1
export IMPALA_LLVM_VERSION=3.8.0-p1
export IMPALA_LLVM_ASAN_VERSION=3.8.0-p1
# Debug builds should use the release+asserts build to get additional coverage.
@@ -285,7 +285,7 @@ export KUDU_MASTER_PORT=${KUDU_MASTER_PORT:-"7051"}
# TODO: Figure out a way to use a snapshot version without causing a lot of breakage due
# to nightly changes from Kudu. The version below is the last released version but
# before release this needs to be updated to the version about to be released.
export KUDU_JAVA_VERSION=0.6.0
export KUDU_JAVA_VERSION=0.10.0-SNAPSHOT
if [[ $OSTYPE == "darwin"* ]]; then
IMPALA_CYRUS_SASL_VERSION=2.1.26

View File

@@ -24,13 +24,10 @@ import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import com.cloudera.impala.util.KuduUtil;
import org.apache.hadoop.fs.permission.FsAction;
import com.cloudera.impala.authorization.Privilege;
import com.cloudera.impala.catalog.HdfsFileFormat;
import com.cloudera.impala.catalog.HdfsStorageDescriptor;
import com.cloudera.impala.catalog.HdfsTable;
import com.cloudera.impala.catalog.KuduTable;
import com.cloudera.impala.catalog.RowFormat;
import com.cloudera.impala.common.AnalysisException;
@@ -43,6 +40,7 @@ import com.cloudera.impala.thrift.TTableName;
import com.cloudera.impala.util.AvroSchemaConverter;
import com.cloudera.impala.util.AvroSchemaParser;
import com.cloudera.impala.util.AvroSchemaUtils;
import com.cloudera.impala.util.KuduUtil;
import com.cloudera.impala.util.MetaStoreUtil;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -376,6 +374,11 @@ public class CreateTableStmt extends StatementBase {
}
if (distributeParams_ != null) {
if (isExternal_) {
throw new AnalysisException(
"The DISTRIBUTE BY clause may not be specified for external tables.");
}
List<String> keyColumns = KuduUtil.parseKeyColumnsAsList(
getTblProperties().get(KuduTable.KEY_KEY_COLUMNS));
for (DistributeParam d : distributeParams_) {
@@ -383,6 +386,9 @@ public class CreateTableStmt extends StatementBase {
if (d.getColumns() == null) d.setColumns(keyColumns);
d.analyze(analyzer);
}
} else if (!isExternal_) {
throw new AnalysisException(
"A data distribution must be specified using the DISTRIBUTE BY clause.");
}
}

View File

@@ -17,20 +17,17 @@
package com.cloudera.impala.catalog.delegates;
import static com.cloudera.impala.util.KuduUtil.compareSchema;
import static com.cloudera.impala.util.KuduUtil.fromImpalaType;
import static com.cloudera.impala.util.KuduUtil.parseKeyColumns;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import com.cloudera.impala.thrift.TDistributeParam;
import com.cloudera.impala.thrift.TDistributeType;
import com.cloudera.impala.util.KuduUtil;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.kududb.ColumnSchema;
import org.kududb.ColumnSchema.ColumnSchemaBuilder;
import org.kududb.Schema;
@@ -38,15 +35,17 @@ import org.kududb.Type;
import org.kududb.client.CreateTableOptions;
import org.kududb.client.KuduClient;
import org.kududb.client.PartialRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudera.impala.catalog.KuduTable;
import com.cloudera.impala.common.ImpalaRuntimeException;
import com.cloudera.impala.thrift.TDistributeParam;
import com.cloudera.impala.util.KuduUtil;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import static com.cloudera.impala.util.KuduUtil.compareSchema;
import static com.cloudera.impala.util.KuduUtil.fromImpalaType;
import static com.cloudera.impala.util.KuduUtil.parseKeyColumns;
/**
* Implementation of the Kudu DDL Delegate. Propagates create and drop table statements to
@@ -174,8 +173,6 @@ public class KuduDdlDelegate extends DdlDelegate {
}
client.deleteTable(kuduTableName);
return;
} catch (ImpalaRuntimeException e) {
throw e;
} catch (Exception e) {
throw new ImpalaRuntimeException("Error dropping Kudu table", e);
}

View File

@@ -1671,7 +1671,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
public void TestCreateKuduTable() {
TestUtils.assumeKuduIsSupported();
// Create Kudu Table with all required properties
AnalyzesOk("create table tab (x int) tblproperties (" +
AnalyzesOk("create table tab (x int) " +
"distribute by hash into 2 buckets tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.table_name'='tab'," +
"'kudu.master_addresses' = '127.0.0.1:8080, 127.0.0.1:8081', " +
@@ -1679,7 +1680,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
")");
// Check that all properties are present
AnalysisError("create table tab (x int) tblproperties (" +
AnalysisError("create table tab (x int) " +
"distribute by hash into 2 buckets tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.master_addresses' = '127.0.0.1:8080', " +
"'kudu.key_columns' = 'a,b,c'" +
@@ -1688,7 +1690,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " +
"present and have valid values.");
AnalysisError("create table tab (x int) tblproperties (" +
AnalysisError("create table tab (x int) " +
"distribute by hash into 2 buckets tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.table_name'='tab'," +
"'kudu.key_columns' = 'a,b,c'"
@@ -1697,7 +1700,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " +
"present and have valid values.");
AnalysisError("create table tab (x int) tblproperties (" +
AnalysisError("create table tab (x int) " +
"distribute by hash into 2 buckets tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.table_name'='tab'," +
"'kudu.master_addresses' = '127.0.0.1:8080'" +
@@ -1707,7 +1711,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"present and have valid values.");
// Check that properties are not empty
AnalysisError("create table tab (x int) tblproperties (" +
AnalysisError("create table tab (x int) " +
"distribute by hash into 2 buckets tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.table_name'=''," +
"'kudu.master_addresses' = '127.0.0.1:8080', " +
@@ -1717,7 +1722,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"if kudu.table_name, kudu.master_addresses, and kudu.key_columns are " +
"present and have valid values.");
AnalysisError("create table tab (x int) tblproperties (" +
AnalysisError("create table tab (x int) " +
"distribute by hash into 2 buckets tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.table_name'='asd'," +
"'kudu.master_addresses' = '', " +
@@ -1728,7 +1734,8 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"present and have valid values.");
// Don't allow caching
AnalysisError("create table tab (x int) cached in 'testPool' tblproperties (" +
AnalysisError("create table tab (x int) cached in 'testPool' " +
"distribute by hash into 2 buckets tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.table_name'='tab'," +
"'kudu.master_addresses' = '127.0.0.1:8080', " +
@@ -1746,7 +1753,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
")");
AnalyzesOk("create table tab (a int, b int, c int, d int) " +
" distribute by hash into 8 buckets " +
"distribute by hash into 8 buckets " +
"tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.table_name'='tab'," +
@@ -1754,9 +1761,28 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"'kudu.key_columns' = 'a,b,c'" +
")");
// DISTRIBUTE BY is required for managed tables.
AnalysisError("create table tab (a int) tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.table_name'='tab'," +
"'kudu.master_addresses' = '127.0.0.1:8080', " +
"'kudu.key_columns' = 'a'" +
")",
"A data distribution must be specified using the DISTRIBUTE BY clause.");
// DISTRIBUTE BY is not allowed for external tables.
AnalysisError("create external table tab (a int) " +
"distribute by hash into 3 buckets tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.table_name'='tab'," +
"'kudu.master_addresses' = '127.0.0.1:8080', " +
"'kudu.key_columns' = 'a'" +
")",
"The DISTRIBUTE BY clause may not be specified for external tables.");
// Number of buckets must be larger 1
AnalysisError("create table tab (a int, b int, c int, d int) " +
" distribute by hash(a,b) into 8 buckets, hash(c) into 1 buckets " +
"distribute by hash(a,b) into 8 buckets, hash(c) into 1 buckets " +
"tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler', " +
"'kudu.table_name'='tab'," +

View File

@@ -221,6 +221,10 @@ def build_table_template(file_format, columns, partition_columns, row_format,
elif file_format == 'parquet':
row_format_stmt = str()
elif file_format == 'kudu':
# Use partitioned_by to set a trivial hash distribution
assert not partitioned_by, "Kudu table shouldn't have partition cols defined"
partitioned_by = "distribute by hash into 3 buckets"
# Fetch KUDU host and port from environment
kudu_master = os.getenv("KUDU_MASTER_ADDRESS", "127.0.0.1")
kudu_master_port = os.getenv("KUDU_MASTER_PORT", "7051")

View File

@@ -3,7 +3,9 @@ select * from functional_kudu.testtbl
00:SCAN KUDU [functional_kudu.testtbl]
---- SCANRANGELOCATIONS
NODE 0:
KUDU KEYRANGE []:[]
KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2]
KUDU KEYRANGE [0, 0, 0, 2]:[]
KUDU KEYRANGE []:[0, 0, 0, 1]
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|
@@ -15,7 +17,9 @@ select * from functional_kudu.testtbl where name = '10'
kudu predicates: name = '10'
---- SCANRANGELOCATIONS
NODE 0:
KUDU KEYRANGE []:[]
KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2]
KUDU KEYRANGE [0, 0, 0, 2]:[]
KUDU KEYRANGE []:[0, 0, 0, 1]
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|
@@ -90,7 +94,9 @@ and zip > 1 and zip < 50
kudu predicates: id >= 10, zip <= 5, id <= 20, zip >= 0, zip <= 30, zip >= 2, zip <= 49, name = 'foo'
---- SCANRANGELOCATIONS
NODE 0:
KUDU KEYRANGE []:[]
KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2]
KUDU KEYRANGE [0, 0, 0, 2]:[]
KUDU KEYRANGE []:[0, 0, 0, 1]
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|
@@ -106,7 +112,9 @@ where id < 10 + 30 and cast(sin(id) as boolean) = true and 20 * 3 >= id and 10
kudu predicates: id <= 39, id <= 60, id <= 102
---- SCANRANGELOCATIONS
NODE 0:
KUDU KEYRANGE []:[]
KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2]
KUDU KEYRANGE [0, 0, 0, 2]:[]
KUDU KEYRANGE []:[0, 0, 0, 1]
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|
@@ -123,7 +131,9 @@ where cast(sin(id) as boolean) = true and name = 'a'
kudu predicates: name = 'a'
---- SCANRANGELOCATIONS
NODE 0:
KUDU KEYRANGE []:[]
KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2]
KUDU KEYRANGE [0, 0, 0, 2]:[]
KUDU KEYRANGE []:[0, 0, 0, 1]
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|
@@ -140,7 +150,9 @@ where cast(sin(id) as boolean) = true and name is null
predicates: name IS NULL, CAST(sin(id) AS BOOLEAN) = TRUE
---- SCANRANGELOCATIONS
NODE 0:
KUDU KEYRANGE []:[]
KUDU KEYRANGE [0, 0, 0, 1]:[0, 0, 0, 2]
KUDU KEYRANGE [0, 0, 0, 2]:[]
KUDU KEYRANGE []:[0, 0, 0, 1]
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|

View File

@@ -3,6 +3,7 @@
# Create managed Kudu table
create table managed_kudu
( id int, f float, d double, s string, v varchar(10), t tinyint, m smallint )
distribute by hash into 3 buckets
tblproperties
(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',

View File

@@ -22,6 +22,7 @@ INT, BIGINT
# carry over into the next unfiltered row (the result below would incorrectly be 2,NULL).
USE kududb_test;
CREATE TABLE impala_2740 (key INT, value INT)
DISTRIBUTE BY HASH INTO 3 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'impala_2740',
@@ -40,6 +41,7 @@ INT, INT
# is run on all impalads. However, for the t1 table there is only as single scan range,
# so two of the scan instances get empty scan ranges.
CREATE TABLE impala_2635_t1 (id BIGINT, name STRING)
DISTRIBUTE BY RANGE SPLIT ROWS ((0))
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'impala_2635_t1',

View File

@@ -7,5 +7,10 @@ CREATE TABLE functional_kudu.dimtbl (
name STRING,
zip INT
)
TBLPROPERTIES ('kudu.master_addresses'='127.0.0.1:7051', 'kudu.key_columns'='id', 'kudu.table_name'='dimtbl', 'transient_lastDdlTime'='1441325601', 'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler')
====
TBLPROPERTIES (
'kudu.master_addresses'='127.0.0.1:7051',
'kudu.key_columns'='id',
'kudu.table_name'='dimtbl',
'transient_lastDdlTime'='1441325601',
'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler')
====

View File

@@ -1,6 +1,7 @@
====
---- QUERY
create table simple (id int, name string, valf float, vali bigint)
distribute by hash into 3 buckets
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'query_test_simple',
@@ -39,4 +40,4 @@ select count(*) from simple_new;
0
---- TYPES
BIGINT
====
====

View File

@@ -261,6 +261,7 @@ delete ignore a from tdata a, tdata b where a.id = 666
# if the Kudu columns are of different types.
create table impala_3454
(key_1 tinyint, key_2 bigint)
DISTRIBUTE BY HASH INTO 3 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'impala_3454',

View File

@@ -130,6 +130,7 @@ class TestKuduMemLimits(ImpalaTestSuite):
l_shipmode STRING,
l_comment STRING
)
DISTRIBUTE BY HASH (l_orderkey) INTO 9 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'tpch_lineitem',