IMPALA-4379: Fix and test Kudu table type checking

Creating Kudu tables shouldn't allow types not supported by
Kudu (e.g. VARCHAR/CHAR, DECIMAL, TIMESTAMP, collection types).
The behavior is inconsistent: for some types it throws in
the catalog, for VARCHAR/CHAR these become strings. This changes
behavior so that all fail during analysis. Analysis tests
were added.

Similarly, external tables cannot contain Kudu types that
Impala doesn't support (e.g. UNIXTIME_MICROS, BINARY). Tests
were added to validate this behavior. Note that this
required upgrading the python Kudu client.

This also fixes a small corner case with ALTER TABLE:
ALTER TABLE shouldn't allow Kudu tables to change the
storage descriptor tblproperty, otherwise the table metadata
gets in an inconsistent state.

Tests were added for all of the above.

Change-Id: I475273cbbf4110db8d0f78ddf9a56abfc6221e3e
Reviewed-on: http://gerrit.cloudera.org:8080/4857
Reviewed-by: Dimitris Tsirogiannis <dtsirogiannis@cloudera.com>
Tested-by: Tim Armstrong <tarmstrong@cloudera.com>
This commit is contained in:
Matthew Jacobs
2016-10-26 19:08:00 -07:00
committed by Tim Armstrong
parent 0d857237a8
commit 9b507b6ed6
12 changed files with 136 additions and 66 deletions

View File

@@ -20,9 +20,12 @@ package org.apache.impala.analysis;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.RowFormat;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.thrift.TCreateTableParams;
import org.apache.impala.thrift.THdfsFileFormat;
import org.apache.impala.thrift.TTableName;
@@ -37,9 +40,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
/**
* Represents a CREATE TABLE statement.
*/
@@ -266,6 +266,15 @@ public class CreateTableStmt extends StatementBase {
getTblProperties().put(KuduTable.KEY_TABLE_NAME,
KuduUtil.getDefaultCreateKuduTableName(getDb(), getTbl()));
}
// Check column types are valid Kudu types
for (ColumnDef col: getColumnDefs()) {
try {
KuduUtil.fromImpalaType(col.getType());
} catch (ImpalaRuntimeException e) {
throw new AnalysisException(String.format(
"Cannot create table '%s': %s", getTbl(), e.getMessage()));
}
}
AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_KEY_COLUMNS),
String.format("PRIMARY KEY must be used instead of the table property '%s'.",
KuduTable.KEY_KEY_COLUMNS));

View File

@@ -49,14 +49,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.impala.analysis.FunctionName;
import org.apache.impala.analysis.TableName;
import org.apache.impala.authorization.User;
@@ -150,6 +142,14 @@ import org.apache.impala.thrift.TTruncateParams;
import org.apache.impala.thrift.TUpdateCatalogRequest;
import org.apache.impala.thrift.TUpdateCatalogResponse;
import org.apache.impala.util.HdfsCachingUtil;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* Class used to execute Catalog Operations, including DDL and refresh/invalidate
@@ -2082,8 +2082,9 @@ public class CatalogOpExecutor {
tbl.getMetaStoreTable().deepCopy();
switch (params.getTarget()) {
case TBL_PROPERTY:
boolean isKuduTable = KuduTable.isKuduTable(msTbl);
msTbl.getParameters().putAll(properties);
if (KuduTable.isKuduTable(msTbl)) {
if (isKuduTable) {
KuduCatalogOpExecutor.validateKuduTblExists(msTbl);
}
break;

View File

@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Preconditions;
@@ -220,15 +221,24 @@ public class KuduCatalogOpExecutor {
}
/**
* Validates the table properties of a Kudu table. It checks that the specified master
* addresses point to valid Kudu masters and that the table exists.
* Validates the table properties of a Kudu table. It checks that the msTbl represents
* a Kudu table (indicated by the Kudu storage handler), that the master
* addresses point to valid Kudu masters, and that the table exists.
* Throws an ImpalaRuntimeException if this is not the case.
*/
public static void validateKuduTblExists(
org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
Map<String, String> properties = msTbl.getParameters();
if (!KuduTable.isKuduTable(msTbl)) {
throw new ImpalaRuntimeException(String.format("Table '%s' does not represent a " +
"Kudu table. Expected storage_handler '%s' but found '%s'",
msTbl.getTableName(), KuduTable.KUDU_STORAGE_HANDLER,
properties.get(KuduTable.KEY_STORAGE_HANDLER)));
}
String masterHosts = properties.get(KuduTable.KEY_MASTER_HOSTS);
Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts));
String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME);
Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) {
kudu.tableExists(kuduTableName);

View File

@@ -206,7 +206,7 @@ public class KuduUtil {
throws ImpalaRuntimeException {
if (!t.isScalarType()) {
throw new ImpalaRuntimeException(format(
"Non-scalar type %s is not supported in Kudu", t.toSql()));
"Type %s is not supported in Kudu", t.toSql()));
}
ScalarType s = (ScalarType) t;
switch (s.getPrimitiveType()) {
@@ -215,9 +215,7 @@ public class KuduUtil {
case INT: return org.apache.kudu.Type.INT32;
case BIGINT: return org.apache.kudu.Type.INT64;
case BOOLEAN: return org.apache.kudu.Type.BOOL;
case CHAR: return org.apache.kudu.Type.STRING;
case STRING: return org.apache.kudu.Type.STRING;
case VARCHAR: return org.apache.kudu.Type.STRING;
case DOUBLE: return org.apache.kudu.Type.DOUBLE;
case FLOAT: return org.apache.kudu.Type.FLOAT;
/* Fall through below */
@@ -228,6 +226,8 @@ public class KuduUtil {
case DATE:
case DATETIME:
case DECIMAL:
case CHAR:
case VARCHAR:
default:
throw new ImpalaRuntimeException(format(
"Type %s is not supported in Kudu", s.toSql()));
@@ -247,7 +247,7 @@ public class KuduUtil {
case STRING: return Type.STRING;
default:
throw new ImpalaRuntimeException(String.format(
"Kudu type %s is not supported in Impala", t));
"Kudu type '%s' is not supported in Impala", t.getName()));
}
}

View File

@@ -25,9 +25,12 @@ import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import junit.framework.Assert;
import org.apache.impala.analysis.CreateTableStmt;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.impala.catalog.ArrayType;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.ColumnStats;
@@ -45,20 +48,14 @@ import org.apache.impala.common.FrontendTestBase;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.testutil.TestUtils;
import org.apache.impala.util.MetaStoreUtil;
import org.junit.Test;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.Test;
import junit.framework.Assert;
public class AnalyzeDDLTest extends FrontendTestBase {
@@ -1352,6 +1349,29 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"tblproperties('kudu.table_name'='t') as select id, int_col from " +
"functional.alltypestiny", "CREATE TABLE AS SELECT is not supported for " +
"external Kudu tables.");
// CTAS into Kudu tables with unsupported types
AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
" stored as kudu as select id, timestamp_col from functional.alltypestiny",
"Cannot create table 't': Type TIMESTAMP is not supported in Kudu");
AnalysisError("create table t primary key (cs) distribute by hash into 3 buckets" +
" stored as kudu as select cs from functional.chars_tiny",
"Cannot create table 't': Type CHAR(5) is not supported in Kudu");
AnalysisError("create table t primary key (vc) distribute by hash into 3 buckets" +
" stored as kudu as select vc from functional.chars_tiny",
"Cannot create table 't': Type VARCHAR(32) is not supported in Kudu");
AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
" stored as kudu as select id, s from functional.complextypes_fileformat",
"Expr 's' in select list returns a complex type 'STRUCT<f1:STRING,f2:INT>'.\n" +
"Only scalar types are allowed in the select list.");
AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
" stored as kudu as select id, m from functional.complextypes_fileformat",
"Expr 'm' in select list returns a complex type 'MAP<STRING,BIGINT>'.\n" +
"Only scalar types are allowed in the select list.");
AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
" stored as kudu as select id, a from functional.complextypes_fileformat",
"Expr 'a' in select list returns a complex type 'ARRAY<INT>'.\n" +
"Only scalar types are allowed in the select list.");
}
@Test
@@ -1832,6 +1852,25 @@ public class AnalyzeDDLTest extends FrontendTestBase {
AnalysisError("create table tab (x int primary key) " +
"partitioned by (y int) stored as kudu", "PARTITIONED BY cannot be used " +
"in Kudu tables.");
// Test unsupported Kudu types
List<String> unsupportedTypes = Lists.newArrayList(
"DECIMAL(9,0)", "TIMESTAMP", "VARCHAR(20)", "CHAR(20)",
"STRUCT<F1:INT,F2:STRING>", "ARRAY<INT>", "MAP<STRING,STRING>");
for (String t: unsupportedTypes) {
String expectedError = String.format(
"Cannot create table 'tab': Type %s is not supported in Kudu", t);
// Unsupported type is PK and partition col
String stmt = String.format("create table tab (x %s primary key) " +
"distribute by hash(x) into 3 buckets stored as kudu", t);
AnalysisError(stmt, expectedError);
// Unsupported type is not PK/partition col
stmt = String.format("create table tab (x int primary key, y %s) " +
"distribute by hash(x) into 3 buckets stored as kudu", t);
AnalysisError(stmt, expectedError);
}
}
@Test

View File

@@ -29,5 +29,5 @@ PY26="$(./find_py26.py)"
"$PY26" pip_download.py virtualenv 13.1.0
# kudu-python is downloaded separately because pip install attempts to execute a
# setup.py subcommand for kudu-python that can fail even if the download succeeds.
"$PY26" pip_download.py kudu-python 0.2.0
"$PY26" pip_download.py kudu-python 0.3.0
popd

View File

@@ -82,7 +82,7 @@ texttable == 0.8.3
# functional and determines the expected kudu-python version. The version must be listed
# in the format below including # and spacing. Keep this formatting! The kudu-python
# version in download_requirements must be kept in sync with this version.
# kudu-python==0.2.0
# kudu-python==0.3.0
Cython == 0.23.4
numpy == 1.10.4

View File

@@ -40,3 +40,8 @@ select count(*) from simple_new;
---- TYPES
BIGINT
====
---- QUERY
alter table simple_new set tblproperties ('storage_handler'='foo')
---- CATCH
ImpalaRuntimeException: Table 'simple_new' does not represent a Kudu table. Expected storage_handler 'com.cloudera.kudu.hive.KuduStorageHandler' but found 'foo'
====

View File

@@ -1,30 +1,5 @@
====
---- QUERY
# This test file contains several cases for what basically amount to analysis errors,
# but they only show up at runtime. These cases correspond to constraints enforced by
# the Kudu storage engine.
#
# Incompatible column types in CTAS.
create table t primary key (id) distribute by hash (id) into 3 buckets
stored as kudu
as select * from functional.alltypestiny
---- CATCH
ImpalaRuntimeException: Type TIMESTAMP is not supported in Kudu
====
---- QUERY
create table t primary key (id) distribute by hash (id) into 3 buckets
stored as kudu
as select c1 as id from functional.decimal_tiny
---- CATCH
ImpalaRuntimeException: Type DECIMAL(10,4) is not supported in Kudu
====
---- QUERY
create table t (a int, b array<string>, primary key(a)) distribute by hash (a)
into 3 buckets stored as kudu
---- CATCH
ImpalaRuntimeException: Non-scalar type ARRAY<STRING> is not supported in Kudu
====
---- QUERY
create table t primary key (id) distribute by hash (id) into 3 buckets
stored as kudu
as select id, int_col from functional.alltypestiny;

View File

@@ -18,15 +18,15 @@ Couldn't resolve this master's address bogus.host.name:7051
====
---- QUERY
create table tdata
(id int primary key, name string, valf float, vali bigint, valv varchar(20), valb boolean)
(id int primary key, name string, valf float, vali bigint, valv string, valb boolean)
DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) STORED AS KUDU
---- RESULTS
====
---- QUERY
insert into tdata values
(1, "martin", 1.0, 232232323, cast('a' as VARCHAR(20)), true),
(2, "david", cast(1.0 as float), 99398493939, cast('b' as VARCHAR(20)), false),
(3, "todd", cast(1.0 as float), 993393939, cast('c' as VARCHAR(20)), true)
(2, "david", cast(1.0 as float), 99398493939, cast('b' as CHAR(1)), false),
(3, "todd", cast(1.0 as float), 993393939, "c", true)
---- RESULTS
: 3
---- RUNTIME_PROFILE
@@ -50,7 +50,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
====
====
---- QUERY
# Try updating a varchar col. with a value that is bigger than it's size (truncated).
# Try updating a string col where casting a value that is bigger than the varchar in the
# cast. The value gets truncated and stored to the string col.
update tdata set valv=cast('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' as VARCHAR(20)) where id = 1
---- RESULTS
---- RUNTIME_PROFILE

View File

@@ -28,7 +28,9 @@ from kudu.schema import (
INT64,
INT8,
SchemaBuilder,
STRING)
STRING,
BINARY,
UNIXTIME_MICROS)
from kudu.client import Partitioning
from random import choice, sample
from string import ascii_lowercase, digits

View File

@@ -23,7 +23,9 @@ from kudu.schema import (
INT32,
INT64,
INT8,
STRING)
STRING,
BINARY,
UNIXTIME_MICROS)
import logging
import pytest
import textwrap
@@ -79,8 +81,6 @@ class TestCreateExternalTable(KuduTestSuite):
def test_col_types(self, cursor, kudu_client):
"""Check that a table can be created using all available column types."""
# TODO: The python Kudu client doesn't yet support TIMESTAMP or BYTE[], those should
# be tested for graceful failure.
kudu_types = [STRING, BOOL, DOUBLE, FLOAT, INT16, INT32, INT64, INT8]
with self.temp_kudu_table(kudu_client, kudu_types) as kudu_table:
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
@@ -96,6 +96,34 @@ class TestCreateExternalTable(KuduTestSuite):
assert col_type.upper() == \
self.kudu_col_type_to_impala_col_type(kudu_col.type.type)
def test_unsupported_binary_col(self, cursor, kudu_client):
"""Check that external tables with BINARY columns fail gracefully.
"""
with self.temp_kudu_table(kudu_client, [INT32, BINARY]) as kudu_table:
impala_table_name = self.random_table_name()
try:
cursor.execute("""
CREATE EXTERNAL TABLE %s
STORED AS KUDU
TBLPROPERTIES('kudu.table_name' = '%s')""" % (impala_table_name,
kudu_table.name))
except Exception as e:
assert "Kudu type 'binary' is not supported in Impala" in str(e)
def test_unsupported_unixtime_col(self, cursor, kudu_client):
"""Check that external tables with UNIXTIME_MICROS columns fail gracefully.
"""
with self.temp_kudu_table(kudu_client, [INT32, UNIXTIME_MICROS]) as kudu_table:
impala_table_name = self.random_table_name()
try:
cursor.execute("""
CREATE EXTERNAL TABLE %s
STORED AS KUDU
TBLPROPERTIES('kudu.table_name' = '%s')""" % (impala_table_name,
kudu_table.name))
except Exception as e:
assert "Kudu type 'unixtime_micros' is not supported in Impala" in str(e)
def test_drop_external_table(self, cursor, kudu_client):
"""Check that dropping an external table only affects the catalog and does not delete
the table in Kudu.