mirror of
https://github.com/apache/impala.git
synced 2025-12-31 06:02:51 -05:00
IMPALA-4379: Fix and test Kudu table type checking
Creating Kudu tables shouldn't allow types not supported by Kudu (e.g. VARCHAR/CHAR, DECIMAL, TIMESTAMP, collection types). The behavior is inconsistent: for some types it throws in the catalog, for VARCHAR/CHAR these become strings. This changes behavior so that all fail during analysis. Analysis tests were added. Similarly, external tables cannot contain Kudu types that Impala doesn't support (e.g. UNIXTIME_MICROS, BINARY). Tests were added to validate this behavior. Note that this required upgrading the python Kudu client. This also fixes a small corner case with ALTER TABLE: ALTER TABLE shouldn't allow Kudu tables to change the storage descriptor tblproperty, otherwise the table metadata gets in an inconsistent state. Tests were added for all of the above. Change-Id: I475273cbbf4110db8d0f78ddf9a56abfc6221e3e Reviewed-on: http://gerrit.cloudera.org:8080/4857 Reviewed-by: Dimitris Tsirogiannis <dtsirogiannis@cloudera.com> Tested-by: Tim Armstrong <tarmstrong@cloudera.com>
This commit is contained in:
committed by
Tim Armstrong
parent
0d857237a8
commit
9b507b6ed6
@@ -20,9 +20,12 @@ package org.apache.impala.analysis;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaParseException;
|
||||
import org.apache.impala.catalog.KuduTable;
|
||||
import org.apache.impala.catalog.RowFormat;
|
||||
import org.apache.impala.common.AnalysisException;
|
||||
import org.apache.impala.common.ImpalaRuntimeException;
|
||||
import org.apache.impala.thrift.TCreateTableParams;
|
||||
import org.apache.impala.thrift.THdfsFileFormat;
|
||||
import org.apache.impala.thrift.TTableName;
|
||||
@@ -37,9 +40,6 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.SchemaParseException;
|
||||
|
||||
/**
|
||||
* Represents a CREATE TABLE statement.
|
||||
*/
|
||||
@@ -266,6 +266,15 @@ public class CreateTableStmt extends StatementBase {
|
||||
getTblProperties().put(KuduTable.KEY_TABLE_NAME,
|
||||
KuduUtil.getDefaultCreateKuduTableName(getDb(), getTbl()));
|
||||
}
|
||||
// Check column types are valid Kudu types
|
||||
for (ColumnDef col: getColumnDefs()) {
|
||||
try {
|
||||
KuduUtil.fromImpalaType(col.getType());
|
||||
} catch (ImpalaRuntimeException e) {
|
||||
throw new AnalysisException(String.format(
|
||||
"Cannot create table '%s': %s", getTbl(), e.getMessage()));
|
||||
}
|
||||
}
|
||||
AnalysisUtils.throwIfNotNull(getTblProperties().get(KuduTable.KEY_KEY_COLUMNS),
|
||||
String.format("PRIMARY KEY must be used instead of the table property '%s'.",
|
||||
KuduTable.KEY_KEY_COLUMNS));
|
||||
|
||||
@@ -49,14 +49,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
|
||||
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
|
||||
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.impala.analysis.FunctionName;
|
||||
import org.apache.impala.analysis.TableName;
|
||||
import org.apache.impala.authorization.User;
|
||||
@@ -150,6 +142,14 @@ import org.apache.impala.thrift.TTruncateParams;
|
||||
import org.apache.impala.thrift.TUpdateCatalogRequest;
|
||||
import org.apache.impala.thrift.TUpdateCatalogResponse;
|
||||
import org.apache.impala.util.HdfsCachingUtil;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
/**
|
||||
* Class used to execute Catalog Operations, including DDL and refresh/invalidate
|
||||
@@ -2082,8 +2082,9 @@ public class CatalogOpExecutor {
|
||||
tbl.getMetaStoreTable().deepCopy();
|
||||
switch (params.getTarget()) {
|
||||
case TBL_PROPERTY:
|
||||
boolean isKuduTable = KuduTable.isKuduTable(msTbl);
|
||||
msTbl.getParameters().putAll(properties);
|
||||
if (KuduTable.isKuduTable(msTbl)) {
|
||||
if (isKuduTable) {
|
||||
KuduCatalogOpExecutor.validateKuduTblExists(msTbl);
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -22,6 +22,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@@ -220,15 +221,24 @@ public class KuduCatalogOpExecutor {
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the table properties of a Kudu table. It checks that the specified master
|
||||
* addresses point to valid Kudu masters and that the table exists.
|
||||
* Validates the table properties of a Kudu table. It checks that the msTbl represents
|
||||
* a Kudu table (indicated by the Kudu storage handler), that the master
|
||||
* addresses point to valid Kudu masters, and that the table exists.
|
||||
* Throws an ImpalaRuntimeException if this is not the case.
|
||||
*/
|
||||
public static void validateKuduTblExists(
|
||||
org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException {
|
||||
String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
|
||||
Map<String, String> properties = msTbl.getParameters();
|
||||
if (!KuduTable.isKuduTable(msTbl)) {
|
||||
throw new ImpalaRuntimeException(String.format("Table '%s' does not represent a " +
|
||||
"Kudu table. Expected storage_handler '%s' but found '%s'",
|
||||
msTbl.getTableName(), KuduTable.KUDU_STORAGE_HANDLER,
|
||||
properties.get(KuduTable.KEY_STORAGE_HANDLER)));
|
||||
}
|
||||
|
||||
String masterHosts = properties.get(KuduTable.KEY_MASTER_HOSTS);
|
||||
Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts));
|
||||
String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME);
|
||||
String kuduTableName = properties.get(KuduTable.KEY_TABLE_NAME);
|
||||
Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName));
|
||||
try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) {
|
||||
kudu.tableExists(kuduTableName);
|
||||
|
||||
@@ -206,7 +206,7 @@ public class KuduUtil {
|
||||
throws ImpalaRuntimeException {
|
||||
if (!t.isScalarType()) {
|
||||
throw new ImpalaRuntimeException(format(
|
||||
"Non-scalar type %s is not supported in Kudu", t.toSql()));
|
||||
"Type %s is not supported in Kudu", t.toSql()));
|
||||
}
|
||||
ScalarType s = (ScalarType) t;
|
||||
switch (s.getPrimitiveType()) {
|
||||
@@ -215,9 +215,7 @@ public class KuduUtil {
|
||||
case INT: return org.apache.kudu.Type.INT32;
|
||||
case BIGINT: return org.apache.kudu.Type.INT64;
|
||||
case BOOLEAN: return org.apache.kudu.Type.BOOL;
|
||||
case CHAR: return org.apache.kudu.Type.STRING;
|
||||
case STRING: return org.apache.kudu.Type.STRING;
|
||||
case VARCHAR: return org.apache.kudu.Type.STRING;
|
||||
case DOUBLE: return org.apache.kudu.Type.DOUBLE;
|
||||
case FLOAT: return org.apache.kudu.Type.FLOAT;
|
||||
/* Fall through below */
|
||||
@@ -228,6 +226,8 @@ public class KuduUtil {
|
||||
case DATE:
|
||||
case DATETIME:
|
||||
case DECIMAL:
|
||||
case CHAR:
|
||||
case VARCHAR:
|
||||
default:
|
||||
throw new ImpalaRuntimeException(format(
|
||||
"Type %s is not supported in Kudu", s.toSql()));
|
||||
@@ -247,7 +247,7 @@ public class KuduUtil {
|
||||
case STRING: return Type.STRING;
|
||||
default:
|
||||
throw new ImpalaRuntimeException(String.format(
|
||||
"Kudu type %s is not supported in Impala", t));
|
||||
"Kudu type '%s' is not supported in Impala", t.getName()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -25,9 +25,12 @@ import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.impala.analysis.CreateTableStmt;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.impala.catalog.ArrayType;
|
||||
import org.apache.impala.catalog.CatalogException;
|
||||
import org.apache.impala.catalog.ColumnStats;
|
||||
@@ -45,20 +48,14 @@ import org.apache.impala.common.FrontendTestBase;
|
||||
import org.apache.impala.common.RuntimeEnv;
|
||||
import org.apache.impala.testutil.TestUtils;
|
||||
import org.apache.impala.util.MetaStoreUtil;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsAction;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
||||
import org.junit.Test;
|
||||
import junit.framework.Assert;
|
||||
|
||||
public class AnalyzeDDLTest extends FrontendTestBase {
|
||||
|
||||
@@ -1352,6 +1349,29 @@ public class AnalyzeDDLTest extends FrontendTestBase {
|
||||
"tblproperties('kudu.table_name'='t') as select id, int_col from " +
|
||||
"functional.alltypestiny", "CREATE TABLE AS SELECT is not supported for " +
|
||||
"external Kudu tables.");
|
||||
|
||||
// CTAS into Kudu tables with unsupported types
|
||||
AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
|
||||
" stored as kudu as select id, timestamp_col from functional.alltypestiny",
|
||||
"Cannot create table 't': Type TIMESTAMP is not supported in Kudu");
|
||||
AnalysisError("create table t primary key (cs) distribute by hash into 3 buckets" +
|
||||
" stored as kudu as select cs from functional.chars_tiny",
|
||||
"Cannot create table 't': Type CHAR(5) is not supported in Kudu");
|
||||
AnalysisError("create table t primary key (vc) distribute by hash into 3 buckets" +
|
||||
" stored as kudu as select vc from functional.chars_tiny",
|
||||
"Cannot create table 't': Type VARCHAR(32) is not supported in Kudu");
|
||||
AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
|
||||
" stored as kudu as select id, s from functional.complextypes_fileformat",
|
||||
"Expr 's' in select list returns a complex type 'STRUCT<f1:STRING,f2:INT>'.\n" +
|
||||
"Only scalar types are allowed in the select list.");
|
||||
AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
|
||||
" stored as kudu as select id, m from functional.complextypes_fileformat",
|
||||
"Expr 'm' in select list returns a complex type 'MAP<STRING,BIGINT>'.\n" +
|
||||
"Only scalar types are allowed in the select list.");
|
||||
AnalysisError("create table t primary key (id) distribute by hash into 3 buckets" +
|
||||
" stored as kudu as select id, a from functional.complextypes_fileformat",
|
||||
"Expr 'a' in select list returns a complex type 'ARRAY<INT>'.\n" +
|
||||
"Only scalar types are allowed in the select list.");
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -1832,6 +1852,25 @@ public class AnalyzeDDLTest extends FrontendTestBase {
|
||||
AnalysisError("create table tab (x int primary key) " +
|
||||
"partitioned by (y int) stored as kudu", "PARTITIONED BY cannot be used " +
|
||||
"in Kudu tables.");
|
||||
|
||||
// Test unsupported Kudu types
|
||||
List<String> unsupportedTypes = Lists.newArrayList(
|
||||
"DECIMAL(9,0)", "TIMESTAMP", "VARCHAR(20)", "CHAR(20)",
|
||||
"STRUCT<F1:INT,F2:STRING>", "ARRAY<INT>", "MAP<STRING,STRING>");
|
||||
for (String t: unsupportedTypes) {
|
||||
String expectedError = String.format(
|
||||
"Cannot create table 'tab': Type %s is not supported in Kudu", t);
|
||||
|
||||
// Unsupported type is PK and partition col
|
||||
String stmt = String.format("create table tab (x %s primary key) " +
|
||||
"distribute by hash(x) into 3 buckets stored as kudu", t);
|
||||
AnalysisError(stmt, expectedError);
|
||||
|
||||
// Unsupported type is not PK/partition col
|
||||
stmt = String.format("create table tab (x int primary key, y %s) " +
|
||||
"distribute by hash(x) into 3 buckets stored as kudu", t);
|
||||
AnalysisError(stmt, expectedError);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -29,5 +29,5 @@ PY26="$(./find_py26.py)"
|
||||
"$PY26" pip_download.py virtualenv 13.1.0
|
||||
# kudu-python is downloaded separately because pip install attempts to execute a
|
||||
# setup.py subcommand for kudu-python that can fail even if the download succeeds.
|
||||
"$PY26" pip_download.py kudu-python 0.2.0
|
||||
"$PY26" pip_download.py kudu-python 0.3.0
|
||||
popd
|
||||
|
||||
@@ -82,7 +82,7 @@ texttable == 0.8.3
|
||||
# functional and determines the expected kudu-python version. The version must be listed
|
||||
# in the format below including # and spacing. Keep this formatting! The kudu-python
|
||||
# version in download_requirements must be kept in sync with this version.
|
||||
# kudu-python==0.2.0
|
||||
# kudu-python==0.3.0
|
||||
Cython == 0.23.4
|
||||
numpy == 1.10.4
|
||||
|
||||
|
||||
@@ -40,3 +40,8 @@ select count(*) from simple_new;
|
||||
---- TYPES
|
||||
BIGINT
|
||||
====
|
||||
---- QUERY
|
||||
alter table simple_new set tblproperties ('storage_handler'='foo')
|
||||
---- CATCH
|
||||
ImpalaRuntimeException: Table 'simple_new' does not represent a Kudu table. Expected storage_handler 'com.cloudera.kudu.hive.KuduStorageHandler' but found 'foo'
|
||||
====
|
||||
|
||||
@@ -1,30 +1,5 @@
|
||||
====
|
||||
---- QUERY
|
||||
# This test file contains several cases for what basically amount to analysis errors,
|
||||
# but they only show up at runtime. These cases correspond to constraints enforced by
|
||||
# the Kudu storage engine.
|
||||
#
|
||||
# Incompatible column types in CTAS.
|
||||
create table t primary key (id) distribute by hash (id) into 3 buckets
|
||||
stored as kudu
|
||||
as select * from functional.alltypestiny
|
||||
---- CATCH
|
||||
ImpalaRuntimeException: Type TIMESTAMP is not supported in Kudu
|
||||
====
|
||||
---- QUERY
|
||||
create table t primary key (id) distribute by hash (id) into 3 buckets
|
||||
stored as kudu
|
||||
as select c1 as id from functional.decimal_tiny
|
||||
---- CATCH
|
||||
ImpalaRuntimeException: Type DECIMAL(10,4) is not supported in Kudu
|
||||
====
|
||||
---- QUERY
|
||||
create table t (a int, b array<string>, primary key(a)) distribute by hash (a)
|
||||
into 3 buckets stored as kudu
|
||||
---- CATCH
|
||||
ImpalaRuntimeException: Non-scalar type ARRAY<STRING> is not supported in Kudu
|
||||
====
|
||||
---- QUERY
|
||||
create table t primary key (id) distribute by hash (id) into 3 buckets
|
||||
stored as kudu
|
||||
as select id, int_col from functional.alltypestiny;
|
||||
|
||||
@@ -18,15 +18,15 @@ Couldn't resolve this master's address bogus.host.name:7051
|
||||
====
|
||||
---- QUERY
|
||||
create table tdata
|
||||
(id int primary key, name string, valf float, vali bigint, valv varchar(20), valb boolean)
|
||||
(id int primary key, name string, valf float, vali bigint, valv string, valb boolean)
|
||||
DISTRIBUTE BY RANGE SPLIT ROWS ((10), (30)) STORED AS KUDU
|
||||
---- RESULTS
|
||||
====
|
||||
---- QUERY
|
||||
insert into tdata values
|
||||
(1, "martin", 1.0, 232232323, cast('a' as VARCHAR(20)), true),
|
||||
(2, "david", cast(1.0 as float), 99398493939, cast('b' as VARCHAR(20)), false),
|
||||
(3, "todd", cast(1.0 as float), 993393939, cast('c' as VARCHAR(20)), true)
|
||||
(2, "david", cast(1.0 as float), 99398493939, cast('b' as CHAR(1)), false),
|
||||
(3, "todd", cast(1.0 as float), 993393939, "c", true)
|
||||
---- RESULTS
|
||||
: 3
|
||||
---- RUNTIME_PROFILE
|
||||
@@ -50,7 +50,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN
|
||||
====
|
||||
====
|
||||
---- QUERY
|
||||
# Try updating a varchar col. with a value that is bigger than it's size (truncated).
|
||||
# Try updating a string col where casting a value that is bigger than the varchar in the
|
||||
# cast. The value gets truncated and stored to the string col.
|
||||
update tdata set valv=cast('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' as VARCHAR(20)) where id = 1
|
||||
---- RESULTS
|
||||
---- RUNTIME_PROFILE
|
||||
|
||||
@@ -28,7 +28,9 @@ from kudu.schema import (
|
||||
INT64,
|
||||
INT8,
|
||||
SchemaBuilder,
|
||||
STRING)
|
||||
STRING,
|
||||
BINARY,
|
||||
UNIXTIME_MICROS)
|
||||
from kudu.client import Partitioning
|
||||
from random import choice, sample
|
||||
from string import ascii_lowercase, digits
|
||||
|
||||
@@ -23,7 +23,9 @@ from kudu.schema import (
|
||||
INT32,
|
||||
INT64,
|
||||
INT8,
|
||||
STRING)
|
||||
STRING,
|
||||
BINARY,
|
||||
UNIXTIME_MICROS)
|
||||
import logging
|
||||
import pytest
|
||||
import textwrap
|
||||
@@ -79,8 +81,6 @@ class TestCreateExternalTable(KuduTestSuite):
|
||||
|
||||
def test_col_types(self, cursor, kudu_client):
|
||||
"""Check that a table can be created using all available column types."""
|
||||
# TODO: The python Kudu client doesn't yet support TIMESTAMP or BYTE[], those should
|
||||
# be tested for graceful failure.
|
||||
kudu_types = [STRING, BOOL, DOUBLE, FLOAT, INT16, INT32, INT64, INT8]
|
||||
with self.temp_kudu_table(kudu_client, kudu_types) as kudu_table:
|
||||
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
|
||||
@@ -96,6 +96,34 @@ class TestCreateExternalTable(KuduTestSuite):
|
||||
assert col_type.upper() == \
|
||||
self.kudu_col_type_to_impala_col_type(kudu_col.type.type)
|
||||
|
||||
def test_unsupported_binary_col(self, cursor, kudu_client):
|
||||
"""Check that external tables with BINARY columns fail gracefully.
|
||||
"""
|
||||
with self.temp_kudu_table(kudu_client, [INT32, BINARY]) as kudu_table:
|
||||
impala_table_name = self.random_table_name()
|
||||
try:
|
||||
cursor.execute("""
|
||||
CREATE EXTERNAL TABLE %s
|
||||
STORED AS KUDU
|
||||
TBLPROPERTIES('kudu.table_name' = '%s')""" % (impala_table_name,
|
||||
kudu_table.name))
|
||||
except Exception as e:
|
||||
assert "Kudu type 'binary' is not supported in Impala" in str(e)
|
||||
|
||||
def test_unsupported_unixtime_col(self, cursor, kudu_client):
|
||||
"""Check that external tables with UNIXTIME_MICROS columns fail gracefully.
|
||||
"""
|
||||
with self.temp_kudu_table(kudu_client, [INT32, UNIXTIME_MICROS]) as kudu_table:
|
||||
impala_table_name = self.random_table_name()
|
||||
try:
|
||||
cursor.execute("""
|
||||
CREATE EXTERNAL TABLE %s
|
||||
STORED AS KUDU
|
||||
TBLPROPERTIES('kudu.table_name' = '%s')""" % (impala_table_name,
|
||||
kudu_table.name))
|
||||
except Exception as e:
|
||||
assert "Kudu type 'unixtime_micros' is not supported in Impala" in str(e)
|
||||
|
||||
def test_drop_external_table(self, cursor, kudu_client):
|
||||
"""Check that dropping an external table only affects the catalog and does not delete
|
||||
the table in Kudu.
|
||||
|
||||
Reference in New Issue
Block a user