IMPALA-3726: Add support for Kudu-specific column options

This commit adds support for Kudu-specific column options in CREATE
TABLE statements. The syntax is:
CREATE TABLE tbl_name ([col_name type [PRIMARY KEY] [option [...]]] [, ....])
where option is:
| NULL
| NOT NULL
| ENCODING encoding_val
| COMPRESSION compression_algorithm
| DEFAULT expr
| BLOCK_SIZE num

The output of the SHOW CREATE TABLE statement was altered to include all the specified
column options for Kudu tables.

Change-Id: I727b9ae1b7b2387db752b58081398dd3f3449c02
Reviewed-on: http://gerrit.cloudera.org:8080/5026
Reviewed-by: Dimitris Tsirogiannis <dtsirogiannis@cloudera.com>
Tested-by: Internal Jenkins
This commit is contained in:
Dimitris Tsirogiannis
2016-11-09 15:11:07 -08:00
committed by Internal Jenkins
parent 60414f0633
commit 3db5ced4ce
31 changed files with 973 additions and 254 deletions

View File

@@ -195,17 +195,13 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
bool add_row = true;
for (int j = 0; j < output_expr_ctxs_.size(); ++j) {
// For INSERT, output_expr_ctxs_ will contain all columns of the table in order.
// For UPDATE and UPSERT, output_expr_ctxs_ only contains the columns that the op
// output_expr_ctxs_ only contains the columns that the op
// applies to, i.e. columns explicitly mentioned in the query, and
// referenced_columns is then used to map to actual column positions.
int col = kudu_table_sink_.referenced_columns.empty() ?
j : kudu_table_sink_.referenced_columns[j];
void* value = output_expr_ctxs_[j]->GetValue(current_row);
// If the value is NULL, we only need to explicitly set it for UPDATE and UPSERT.
// For INSERT, it can be ignored as unspecified cols will be implicitly set to NULL.
if (value == NULL) {
if (table_schema.Column(j).is_nullable()) {
KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col),

View File

@@ -72,7 +72,18 @@ enum THdfsCompression {
SNAPPY,
SNAPPY_BLOCKED,
LZO,
LZ4
LZ4,
ZLIB
}
enum TColumnEncoding {
AUTO,
PLAIN,
PREFIX,
GROUP_VARINT,
RLE,
DICTIONARY,
BIT_SHUFFLE
}
enum THdfsSeqCompressionMode {
@@ -191,11 +202,14 @@ struct TColumn {
8: optional string column_qualifier
9: optional bool is_binary
// Indicates whether this is a Kudu column. If true implies all following Kudu specific
// fields are set.
// All the following are Kudu-specific column properties
10: optional bool is_kudu_column
11: optional bool is_key
12: optional bool is_nullable
13: optional TColumnEncoding encoding
14: optional THdfsCompression compression
15: optional Exprs.TExpr default_value
16: optional i32 block_size
}
// Represents a block in an HDFS file

View File

@@ -18,15 +18,18 @@
package org.apache.impala.analysis;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java_cup.runtime.Symbol;
import org.apache.impala.analysis.ColumnDef;
import org.apache.impala.analysis.ColumnDef.Option;
import org.apache.impala.analysis.UnionStmt.Qualifier;
import org.apache.impala.analysis.UnionStmt.UnionOperand;
import org.apache.impala.analysis.RangePartition;
@@ -240,16 +243,17 @@ parser code {:
// ALL KEYWORDS ALSO NEED TO BE ADDED TO THE ident_or_kw PRODUCTION.
terminal
KW_ADD, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_ANALYTIC, KW_AND, KW_ANTI, KW_API_VERSION,
KW_ARRAY, KW_AS, KW_ASC, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY, KW_BOOLEAN,
KW_BUCKETS, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST, KW_CHANGE, KW_CHAR,
KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPUTE, KW_CREATE,
KW_CROSS, KW_CURRENT, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME,
KW_DECIMAL, KW_DELETE, KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISTINCT, KW_DISTRIBUTE,
KW_DIV, KW_DOUBLE, KW_DROP, KW_ELSE, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN,
KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN,
KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL,
KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF,
KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT,
KW_ARRAY, KW_AS, KW_ASC, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY, KW_BLOCKSIZE,
KW_BOOLEAN, KW_BUCKETS, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST, KW_CHANGE,
KW_CHAR, KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPRESSION,
KW_COMPUTE, KW_CREATE, KW_CROSS, KW_CURRENT, KW_DATA, KW_DATABASE, KW_DATABASES,
KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT, KW_DELETE, KW_DELIMITED, KW_DESC,
KW_DESCRIBE, KW_DISTINCT, KW_DISTRIBUTE, KW_DIV, KW_DOUBLE, KW_DROP, KW_ELSE,
KW_ENCODING, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN, KW_EXTENDED, KW_EXTERNAL,
KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN, KW_FIRST, KW_FLOAT,
KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION,
KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF, KW_ILIKE,
KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT,
KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS, KW_JOIN,
KW_KUDU, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP,
KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORDER,
@@ -280,6 +284,10 @@ terminal String STRING_LITERAL;
terminal String UNMATCHED_STRING_LITERAL;
terminal String UNEXPECTED_CHAR;
// IMPALA-3726 introduced the DEFAULT keyword which could break existing applications
// that use the identifier "KEYWORD" as database, column or table names. To avoid that,
// the ident_or_default non-terminal is introduced and should be used instead of IDENT.
nonterminal String ident_or_keyword, ident_or_default;
nonterminal StatementBase stmt;
// Single select statement.
nonterminal SelectStmt select_stmt;
@@ -399,7 +407,6 @@ nonterminal CreateDataSrcStmt create_data_src_stmt;
nonterminal DropDataSrcStmt drop_data_src_stmt;
nonterminal ShowDataSrcsStmt show_data_srcs_stmt;
nonterminal StructField struct_field_def;
nonterminal String ident_or_keyword;
nonterminal DistributeParam distribute_hash_param;
nonterminal List<RangePartition> range_params_list;
nonterminal RangePartition range_param;
@@ -415,7 +422,7 @@ nonterminal ArrayList<StructField> struct_field_def_list;
// Options for DDL commands - CREATE/DROP/ALTER
nonterminal HdfsCachingOp cache_op_val;
nonterminal BigDecimal opt_cache_op_replication;
nonterminal String comment_val;
nonterminal String comment_val, opt_comment_val;
nonterminal Boolean external_val;
nonterminal Boolean purge_val;
nonterminal String opt_init_string_val;
@@ -444,6 +451,13 @@ nonterminal String opt_kw_column;
nonterminal String opt_kw_table;
nonterminal Boolean overwrite_val;
nonterminal Boolean cascade_val;
nonterminal Boolean nullability_val;
nonterminal String encoding_val;
nonterminal String compression_val;
nonterminal Expr default_val;
nonterminal LiteralExpr block_size_val;
nonterminal Pair<Option, Object> column_option;
nonterminal Map<Option, Object> column_options_map;
// For GRANT/REVOKE/AUTH DDL statements
nonterminal ShowRolesStmt show_roles_stmt;
@@ -487,6 +501,7 @@ nonterminal TFunctionCategory opt_function_category;
precedence left KW_OR;
precedence left KW_AND;
precedence right KW_NOT, NOT;
precedence left KW_DEFAULT;
precedence left KW_BETWEEN, KW_IN, KW_IS, KW_EXISTS;
precedence left KW_LIKE, KW_RLIKE, KW_ILIKE, KW_REGEXP, KW_IREGEXP;
precedence left EQUAL, NOTEQUAL, LESSTHAN, GREATERTHAN, KW_FROM, KW_DISTINCT;
@@ -789,31 +804,33 @@ opt_kw_table ::=
show_roles_stmt ::=
KW_SHOW KW_ROLES
{: RESULT = new ShowRolesStmt(false, null); :}
| KW_SHOW KW_ROLE KW_GRANT KW_GROUP IDENT:group
| KW_SHOW KW_ROLE KW_GRANT KW_GROUP ident_or_default:group
{: RESULT = new ShowRolesStmt(false, group); :}
| KW_SHOW KW_CURRENT KW_ROLES
{: RESULT = new ShowRolesStmt(true, null); :}
;
show_grant_role_stmt ::=
KW_SHOW KW_GRANT KW_ROLE IDENT:role
KW_SHOW KW_GRANT KW_ROLE ident_or_default:role
{: RESULT = new ShowGrantRoleStmt(role, null); :}
| KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON server_ident:server_kw
| KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON server_ident:server_kw
{:
RESULT = new ShowGrantRoleStmt(role,
PrivilegeSpec.createServerScopedPriv(TPrivilegeLevel.ALL));
:}
| KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON KW_DATABASE IDENT:db_name
| KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON
KW_DATABASE ident_or_default:db_name
{:
RESULT = new ShowGrantRoleStmt(role,
PrivilegeSpec.createDbScopedPriv(TPrivilegeLevel.ALL, db_name));
:}
| KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON KW_TABLE table_name:tbl_name
| KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON KW_TABLE table_name:tbl_name
{:
RESULT = new ShowGrantRoleStmt(role,
PrivilegeSpec.createTableScopedPriv(TPrivilegeLevel.ALL, tbl_name));
:}
| KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON uri_ident:uri_kw STRING_LITERAL:uri
| KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON uri_ident:uri_kw
STRING_LITERAL:uri
{:
RESULT = new ShowGrantRoleStmt(role,
PrivilegeSpec.createUriScopedPriv(TPrivilegeLevel.ALL, new HdfsUri(uri)));
@@ -821,40 +838,40 @@ show_grant_role_stmt ::=
;
create_drop_role_stmt ::=
KW_CREATE KW_ROLE IDENT:role
KW_CREATE KW_ROLE ident_or_default:role
{: RESULT = new CreateDropRoleStmt(role, false); :}
| KW_DROP KW_ROLE IDENT:role
| KW_DROP KW_ROLE ident_or_default:role
{: RESULT = new CreateDropRoleStmt(role, true); :}
;
grant_role_stmt ::=
KW_GRANT KW_ROLE IDENT:role KW_TO KW_GROUP IDENT:group
KW_GRANT KW_ROLE ident_or_default:role KW_TO KW_GROUP ident_or_default:group
{: RESULT = new GrantRevokeRoleStmt(role, group, true); :}
;
revoke_role_stmt ::=
KW_REVOKE KW_ROLE IDENT:role KW_FROM KW_GROUP IDENT:group
KW_REVOKE KW_ROLE ident_or_default:role KW_FROM KW_GROUP ident_or_default:group
{: RESULT = new GrantRevokeRoleStmt(role, group, false); :}
;
grant_privilege_stmt ::=
KW_GRANT privilege_spec:priv KW_TO opt_kw_role:opt_role IDENT:role
KW_GRANT privilege_spec:priv KW_TO opt_kw_role:opt_role ident_or_default:role
opt_with_grantopt:grant_opt
{: RESULT = new GrantRevokePrivStmt(role, priv, true, grant_opt); :}
;
revoke_privilege_stmt ::=
KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM
opt_kw_role:opt_role IDENT:role
opt_kw_role:opt_role ident_or_default:role
{: RESULT = new GrantRevokePrivStmt(role, priv, false, grant_opt); :}
;
privilege_spec ::=
privilege:priv KW_ON server_ident:server_kw
{: RESULT = PrivilegeSpec.createServerScopedPriv(priv); :}
| privilege:priv KW_ON server_ident:server_kw IDENT:server_name
| privilege:priv KW_ON server_ident:server_kw ident_or_default:server_name
{: RESULT = PrivilegeSpec.createServerScopedPriv(priv, server_name); :}
| privilege:priv KW_ON KW_DATABASE IDENT:db_name
| privilege:priv KW_ON KW_DATABASE ident_or_default:db_name
{: RESULT = PrivilegeSpec.createDbScopedPriv(priv, db_name); :}
| privilege:priv KW_ON KW_TABLE table_name:tbl_name
{: RESULT = PrivilegeSpec.createTableScopedPriv(priv, tbl_name); :}
@@ -902,9 +919,9 @@ alter_tbl_stmt ::=
RESULT = new AlterTableAddPartitionStmt(table, partition,
location, if_not_exists, cache_op);
:}
| KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column IDENT:col_name
| KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column ident_or_default:col_name
{: RESULT = new AlterTableDropColStmt(table, col_name); :}
| KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column IDENT:col_name
| KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column ident_or_default:col_name
column_def:col_def
{: RESULT = new AlterTableChangeColStmt(table, col_name, col_def); :}
| KW_ALTER KW_TABLE table_name:table KW_DROP if_exists_val:if_exists
@@ -927,7 +944,7 @@ alter_tbl_stmt ::=
table_property_type:target LPAREN properties_map:properties RPAREN
{: RESULT = new AlterTableSetTblProperties(table, partitions, target, properties); :}
| KW_ALTER KW_TABLE table_name:table opt_partition_set:partition KW_SET
KW_COLUMN KW_STATS IDENT:col LPAREN properties_map:map RPAREN
KW_COLUMN KW_STATS ident_or_default:col LPAREN properties_map:map RPAREN
{:
// The opt_partition_set is used to avoid conflicts even though
// a partition clause does not make sense for this stmt. If a partition
@@ -966,8 +983,8 @@ replace_existing_cols_val ::=
;
create_db_stmt ::=
KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists IDENT:db_name
comment_val:comment location_val:location
KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists ident_or_default:db_name
opt_comment_val:comment location_val:location
{: RESULT = new CreateDbStmt(db_name, comment, location, if_not_exists); :}
;
@@ -1032,9 +1049,9 @@ create_tbl_stmt ::=
RESULT = new CreateTableStmt(tbl_def);
:}
| tbl_def_with_col_defs:tbl_def
KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id IDENT:data_src_name
KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id ident_or_default:data_src_name
opt_init_string_val:init_string
comment_val:comment
opt_comment_val:comment
{:
// Need external_val in the grammar to avoid shift/reduce conflict with other
// CREATE TABLE statements.
@@ -1067,7 +1084,7 @@ create_tbl_stmt ::=
create_tbl_like_stmt ::=
tbl_def_without_col_defs:tbl_def
KW_LIKE table_name:other_table
comment_val:comment
opt_comment_val:comment
file_format_create_table_val:file_format location_val:location
{:
RESULT = new CreateTableLikeStmt(tbl_def.getTblName(), other_table,
@@ -1089,7 +1106,8 @@ tbl_def_with_col_defs ::=
tbl_def.getColumnDefs().addAll(list);
RESULT = tbl_def;
:}
| tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA primary_keys:primary_keys RPAREN
| tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA
primary_keys:primary_keys RPAREN
{:
tbl_def.getColumnDefs().addAll(list);
tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys);
@@ -1103,7 +1121,7 @@ primary_keys ::=
;
tbl_options ::=
comment_val:comment row_format_val:row_format serde_properties:serde_props
opt_comment_val:comment row_format_val:row_format serde_properties:serde_props
file_format_create_table_val:file_format location_val:location cache_op_val:cache_op
tbl_properties:tbl_props
{:
@@ -1280,6 +1298,11 @@ opt_cache_op_replication ::=
;
comment_val ::=
KW_COMMENT STRING_LITERAL:comment
{: RESULT = comment; :}
;
opt_comment_val ::=
KW_COMMENT STRING_LITERAL:comment
{: RESULT = comment; :}
| /* empty */
@@ -1422,20 +1445,81 @@ column_def_list ::=
;
column_def ::=
IDENT:col_name type_def:type is_primary_key_val:primary_key comment_val:comment
{: RESULT = new ColumnDef(col_name, type, primary_key, comment); :}
ident_or_default:col_name type_def:type column_options_map:options
{: RESULT = new ColumnDef(col_name, type, options); :}
| ident_or_default:col_name type_def:type
{: RESULT = new ColumnDef(col_name, type); :}
;
column_options_map ::=
column_options_map:map column_option:col_option
{:
if (map.put(col_option.first, col_option.second) != null) {
throw new Exception(String.format("Column option %s is specified multiple times",
col_option.first.toString()));
}
RESULT = map;
:}
| column_option:col_option
{:
Map<Option, Object> options = Maps.newHashMap();
options.put(col_option.first, col_option.second);
RESULT = options;
:}
;
column_option ::=
is_primary_key_val:primary_key
{: RESULT = new Pair<Option, Object>(Option.IS_PRIMARY_KEY, primary_key); :}
| nullability_val:nullability
{: RESULT = new Pair<Option, Object>(Option.IS_NULLABLE, nullability); :}
| encoding_val:encoding
{: RESULT = new Pair<Option, Object>(Option.ENCODING, encoding); :}
| compression_val:compression
{: RESULT = new Pair<Option, Object>(Option.COMPRESSION, compression); :}
| default_val:default_val
{: RESULT = new Pair<Option, Object>(Option.DEFAULT, default_val); :}
| block_size_val:block_size
{: RESULT = new Pair<Option, Object>(Option.BLOCK_SIZE, block_size); :}
| comment_val:comment
{: RESULT = new Pair<Option, Object>(Option.COMMENT, comment); :}
;
is_primary_key_val ::=
KW_PRIMARY key_ident
{: RESULT = true; :}
| /* empty */
;
nullability_val ::=
KW_NOT KW_NULL
{: RESULT = false; :}
| KW_NULL
{: RESULT = true; :}
;
encoding_val ::=
KW_ENCODING ident_or_default:encoding_ident
{: RESULT = encoding_ident; :}
;
compression_val ::=
KW_COMPRESSION ident_or_default:compression_ident
{: RESULT = compression_ident; :}
;
default_val ::=
KW_DEFAULT expr:default_val
{: RESULT = default_val; :}
;
block_size_val ::=
KW_BLOCKSIZE literal:block_size
{: RESULT = block_size; :}
;
create_view_stmt ::=
KW_CREATE KW_VIEW if_not_exists_val:if_not_exists table_name:view_name
view_column_defs:col_defs comment_val:comment KW_AS query_stmt:view_def
view_column_defs:col_defs opt_comment_val:comment KW_AS query_stmt:view_def
{:
RESULT = new CreateViewStmt(if_not_exists, view_name, col_defs, comment, view_def);
:}
@@ -1443,7 +1527,7 @@ create_view_stmt ::=
create_data_src_stmt ::=
KW_CREATE KW_DATA source_ident:is_source_id
if_not_exists_val:if_not_exists IDENT:data_src_name
if_not_exists_val:if_not_exists ident_or_default:data_src_name
KW_LOCATION STRING_LITERAL:location
KW_CLASS STRING_LITERAL:class_name
KW_API_VERSION STRING_LITERAL:api_version
@@ -1534,8 +1618,12 @@ view_column_def_list ::=
;
view_column_def ::=
IDENT:col_name comment_val:comment
{: RESULT = new ColumnDef(col_name, null, comment); :}
ident_or_default:col_name opt_comment_val:comment
{:
Map<Option, Object> options = Maps.newHashMap();
if (comment != null) options.put(Option.COMMENT, comment);
RESULT = new ColumnDef(col_name, null, options);
:}
;
alter_view_stmt ::=
@@ -1571,7 +1659,8 @@ drop_stats_stmt ::=
;
drop_db_stmt ::=
KW_DROP db_or_schema_kw if_exists_val:if_exists IDENT:db_name cascade_val:cascade
KW_DROP db_or_schema_kw if_exists_val:if_exists ident_or_default:db_name
cascade_val:cascade
{: RESULT = new DropDbStmt(db_name, if_exists, cascade); :}
;
@@ -1593,7 +1682,8 @@ drop_function_stmt ::=
;
drop_data_src_stmt ::=
KW_DROP KW_DATA source_ident:is_source_id if_exists_val:if_exists IDENT:data_src_name
KW_DROP KW_DATA source_ident:is_source_id if_exists_val:if_exists
ident_or_default:data_src_name
{: RESULT = new DropDataSrcStmt(data_src_name, if_exists); :}
;
@@ -1683,7 +1773,7 @@ static_partition_key_value_list ::=
partition_key_value ::=
// Dynamic partition key values.
IDENT:column
ident_or_default:column
{: RESULT = new PartitionKeyValue(column, null); :}
| static_partition_key_value:partition
{: RESULT = partition; :}
@@ -1691,7 +1781,7 @@ partition_key_value ::=
static_partition_key_value ::=
// Static partition key values.
IDENT:column EQUAL expr:e
ident_or_default:column EQUAL expr:e
{: RESULT = new PartitionKeyValue(column, e); :}
;
@@ -1825,11 +1915,12 @@ opt_with_clause ::=
;
with_view_def ::=
IDENT:alias KW_AS LPAREN query_stmt:query RPAREN
ident_or_default:alias KW_AS LPAREN query_stmt:query RPAREN
{: RESULT = new View(alias, query, null); :}
| STRING_LITERAL:alias KW_AS LPAREN query_stmt:query RPAREN
{: RESULT = new View(alias, query, null); :}
| IDENT:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN query_stmt:query RPAREN
| ident_or_default:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN
query_stmt:query RPAREN
{: RESULT = new View(alias, query, col_names); :}
| STRING_LITERAL:alias LPAREN ident_list:col_names RPAREN
KW_AS LPAREN query_stmt:query RPAREN
@@ -1957,7 +2048,7 @@ values_operand_list ::=
;
use_stmt ::=
KW_USE IDENT:db
KW_USE ident_or_default:db
{: RESULT = new UseStmt(db); :}
;
@@ -1966,9 +2057,9 @@ show_tables_stmt ::=
{: RESULT = new ShowTablesStmt(); :}
| KW_SHOW KW_TABLES show_pattern:showPattern
{: RESULT = new ShowTablesStmt(showPattern); :}
| KW_SHOW KW_TABLES KW_IN IDENT:db
| KW_SHOW KW_TABLES KW_IN ident_or_default:db
{: RESULT = new ShowTablesStmt(db, null); :}
| KW_SHOW KW_TABLES KW_IN IDENT:db show_pattern:showPattern
| KW_SHOW KW_TABLES KW_IN ident_or_default:db show_pattern:showPattern
{: RESULT = new ShowTablesStmt(db, showPattern); :}
;
@@ -1996,9 +2087,9 @@ show_functions_stmt ::=
{: RESULT = new ShowFunctionsStmt(null, null, fn_type); :}
| KW_SHOW opt_function_category:fn_type KW_FUNCTIONS show_pattern:showPattern
{: RESULT = new ShowFunctionsStmt(null, showPattern, fn_type); :}
| KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN IDENT:db
| KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN ident_or_default:db
{: RESULT = new ShowFunctionsStmt(db, null, fn_type); :}
| KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN IDENT:db
| KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN ident_or_default:db
show_pattern:showPattern
{: RESULT = new ShowFunctionsStmt(db, showPattern, fn_type); :}
;
@@ -2051,7 +2142,7 @@ show_files_stmt ::=
;
describe_db_stmt ::=
KW_DESCRIBE db_or_schema_kw describe_output_style:style IDENT:db
KW_DESCRIBE db_or_schema_kw describe_output_style:style ident_or_default:db
{: RESULT = new DescribeDbStmt(db, style); :}
;
@@ -2108,9 +2199,9 @@ select_clause ::=
;
set_stmt ::=
KW_SET IDENT:key EQUAL literal:l
KW_SET ident_or_default:key EQUAL literal:l
{: RESULT = new SetStmt(key, l.getStringValue()); :}
| KW_SET IDENT:key EQUAL IDENT:ident
| KW_SET ident_or_default:key EQUAL ident_or_default:ident
{: RESULT = new SetStmt(key, ident); :}
| KW_SET
{: RESULT = new SetStmt(null, null); :}
@@ -2140,9 +2231,9 @@ select_list_item ::=
;
alias_clause ::=
KW_AS IDENT:ident
KW_AS ident_or_default:ident
{: RESULT = ident; :}
| IDENT:ident
| ident_or_default:ident
{: RESULT = ident; :}
| KW_AS STRING_LITERAL:l
{: RESULT = l; :}
@@ -2158,9 +2249,9 @@ star_expr ::=
;
table_name ::=
IDENT:tbl
ident_or_default:tbl
{: RESULT = new TableName(null, tbl); :}
| IDENT:db DOT IDENT:tbl
| ident_or_default:db DOT ident_or_default:tbl
{: RESULT = new TableName(db, tbl); :}
;
@@ -2295,13 +2386,13 @@ opt_plan_hints ::=
;
ident_list ::=
IDENT:ident
ident_or_default:ident
{:
ArrayList<String> list = new ArrayList<String>();
list.add(ident);
RESULT = list;
:}
| ident_list:list COMMA IDENT:ident
| ident_list:list COMMA ident_or_default:ident
{:
list.add(ident);
RESULT = list;
@@ -2525,7 +2616,7 @@ function_call_expr ::=
| function_name:fn_name LPAREN function_params:params RPAREN
{: RESULT = FunctionCallExpr.createExpr(fn_name, params); :}
// Below is a special case for EXTRACT. Idents are used to avoid adding new keywords.
| function_name:fn_name LPAREN IDENT:u KW_FROM expr:t RPAREN
| function_name:fn_name LPAREN ident_or_default:u KW_FROM expr:t RPAREN
{: RESULT = new ExtractFromExpr(fn_name, u, t); :}
;
@@ -2828,13 +2919,13 @@ slot_ref ::=
;
dotted_path ::=
IDENT:ident
ident_or_default:ident
{:
ArrayList<String> list = new ArrayList<String>();
list.add(ident);
RESULT = list;
:}
| dotted_path:list DOT IDENT:ident
| dotted_path:list DOT ident_or_default:ident
{:
list.add(ident);
RESULT = list;
@@ -2895,7 +2986,7 @@ type ::=
// that we can parse type strings from the Hive Metastore which
// may have unquoted identifiers corresponding to keywords.
struct_field_def ::=
ident_or_keyword:name COLON type:t comment_val:comment
ident_or_keyword:name COLON type:t opt_comment_val:comment
{: RESULT = new StructField(name, t, comment); :}
;
@@ -2913,6 +3004,13 @@ struct_field_def_list ::=
:}
;
ident_or_default ::=
IDENT:name
{: RESULT = name.toString(); :}
| KW_DEFAULT:name
{: RESULT = name.toString(); :}
;
ident_or_keyword ::=
IDENT:r
{: RESULT = r.toString(); :}
@@ -2946,6 +3044,8 @@ ident_or_keyword ::=
{: RESULT = r.toString(); :}
| KW_BINARY:r
{: RESULT = r.toString(); :}
| KW_BLOCKSIZE:r
{: RESULT = r.toString(); :}
| KW_BOOLEAN:r
{: RESULT = r.toString(); :}
| KW_BUCKETS:r
@@ -2974,6 +3074,8 @@ ident_or_keyword ::=
{: RESULT = r.toString(); :}
| KW_COMMENT:r
{: RESULT = r.toString(); :}
| KW_COMPRESSION:r
{: RESULT = r.toString(); :}
| KW_COMPUTE:r
{: RESULT = r.toString(); :}
| KW_CREATE:r
@@ -2994,6 +3096,8 @@ ident_or_keyword ::=
{: RESULT = r.toString(); :}
| KW_DECIMAL:r
{: RESULT = r.toString(); :}
| KW_DEFAULT:r
{: RESULT = r.toString(); :}
| KW_DELETE:r
{: RESULT = r.toString(); :}
| KW_DELIMITED:r
@@ -3014,6 +3118,8 @@ ident_or_keyword ::=
{: RESULT = r.toString(); :}
| KW_ELSE:r
{: RESULT = r.toString(); :}
| KW_ENCODING:r
{: RESULT = r.toString(); :}
| KW_END:r
{: RESULT = r.toString(); :}
| KW_ESCAPED:r

View File

@@ -90,7 +90,7 @@ public class AlterTableAddReplaceColsStmt extends AlterTableStmt {
// partition columns.
Set<String> colNames = Sets.newHashSet();
for (ColumnDef c: columnDefs_) {
c.analyze();
c.analyze(analyzer);
String colName = c.getColName().toLowerCase();
if (existingPartitionKeys.contains(colName)) {
throw new AnalysisException(

View File

@@ -90,7 +90,7 @@ public class AlterTableChangeColStmt extends AlterTableStmt {
}
// Check that the new column def's name is valid.
newColDef_.analyze();
newColDef_.analyze(analyzer);
// Verify that if the column name is being changed, the new name doesn't conflict
// with an existing column.
if (!colName_.toLowerCase().equals(newColDef_.getColName().toLowerCase()) &&

View File

@@ -18,21 +18,26 @@
package org.apache.impala.analysis;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.Map;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.util.KuduUtil;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
import org.apache.kudu.ColumnSchema.Encoding;
/**
* Represents a column definition in a CREATE/ALTER TABLE/VIEW statement.
@@ -40,31 +45,89 @@ import org.apache.impala.util.MetaStoreUtil;
* whereas column definitions in CREATE/ALTER VIEW statements infer the column type from
* the corresponding view definition. All column definitions have an optional comment.
* Since a column definition refers a column stored in the Metastore, the column name
* must be valid according to the Metastore's rules (see @MetaStoreUtils).
* must be valid according to the Metastore's rules (see @MetaStoreUtils). A number of
* additional column options may be specified for Kudu tables.
*/
public class ColumnDef {
private final String colName_;
private String comment_;
// Required in CREATE/ALTER TABLE stmts. Set to NULL in CREATE/ALTER VIEW stmts,
// for which we setType() after analyzing the defining view definition stmt.
private final TypeDef typeDef_;
private Type type_;
private String comment_;
// Set to true if the user specified "PRIMARY KEY" in the column definition. Kudu table
// definitions may use this.
private boolean isPrimaryKey_;
public ColumnDef(String colName, TypeDef typeDef, String comment) {
this(colName, typeDef, false, comment);
// Available column options
public enum Option {
IS_PRIMARY_KEY,
IS_NULLABLE,
ENCODING,
COMPRESSION,
DEFAULT,
BLOCK_SIZE,
COMMENT
}
public ColumnDef(String colName, TypeDef typeDef, boolean isPrimaryKey,
String comment) {
// Kudu-specific column options
//
// Set to true if the user specified "PRIMARY KEY" in the column definition.
private boolean isPrimaryKey_;
// Set to true if this column may contain null values. Can be NULL if
// not specified.
private Boolean isNullable_;
private String encodingVal_;
// Encoding for this column; set in analysis.
private Encoding encoding_;
private String compressionVal_;
// Compression algorithm for this column; set in analysis.
private CompressionAlgorithm compression_;
// Default value for this column.
private Expr defaultValue_;
// Desired block size for this column.
private LiteralExpr blockSize_;
public ColumnDef(String colName, TypeDef typeDef, Map<Option, Object> options) {
Preconditions.checkNotNull(options);
colName_ = colName.toLowerCase();
typeDef_ = typeDef;
isPrimaryKey_ = isPrimaryKey;
comment_ = comment;
for (Map.Entry<Option, Object> option: options.entrySet()) {
switch (option.getKey()) {
case IS_PRIMARY_KEY:
Preconditions.checkState(option.getValue() instanceof Boolean);
isPrimaryKey_ = (Boolean) option.getValue();
break;
case IS_NULLABLE:
Preconditions.checkState(option.getValue() instanceof Boolean);
isNullable_ = (Boolean) option.getValue();
break;
case ENCODING:
Preconditions.checkState(option.getValue() instanceof String);
encodingVal_ = ((String) option.getValue()).toUpperCase();
break;
case COMPRESSION:
Preconditions.checkState(option.getValue() instanceof String);
compressionVal_ = ((String) option.getValue()).toUpperCase();
break;
case DEFAULT:
Preconditions.checkState(option.getValue() instanceof Expr);
defaultValue_ = (Expr) option.getValue();
break;
case BLOCK_SIZE:
Preconditions.checkState(option.getValue() instanceof LiteralExpr);
blockSize_ = (LiteralExpr) option.getValue();
break;
case COMMENT:
Preconditions.checkState(option.getValue() instanceof String);
comment_ = (String) option.getValue();
break;
default:
throw new IllegalStateException(String.format("Illegal option %s",
option.getKey()));
}
}
}
public ColumnDef(String colName, TypeDef typeDef) {
this(colName, typeDef, Collections.<Option, Object>emptyMap());
}
/**
@@ -81,8 +144,7 @@ public class ColumnDef {
colName_ = fs.getName();
typeDef_ = new TypeDef(type);
comment_ = fs.getComment();
isPrimaryKey_ = false;
analyze();
analyze(null);
}
public String getColName() { return colName_; }
@@ -92,8 +154,13 @@ public class ColumnDef {
boolean isPrimaryKey() { return isPrimaryKey_; }
public void setComment(String comment) { comment_ = comment; }
public String getComment() { return comment_; }
public boolean hasKuduOptions() {
return isPrimaryKey_ || isNullable_ != null || encodingVal_ != null
|| compressionVal_ != null || defaultValue_ != null || blockSize_ != null;
}
public boolean isNullable() { return isNullable_ != null && isNullable_; }
public void analyze() throws AnalysisException {
public void analyze(Analyzer analyzer) throws AnalysisException {
// Check whether the column name meets the Metastore's requirements.
if (!MetaStoreUtils.validateName(colName_)) {
throw new AnalysisException("Invalid column/field name: " + colName_);
@@ -112,6 +179,10 @@ public class ColumnDef {
"%s has %d characters.", colName_, MetaStoreUtil.MAX_TYPE_NAME_LENGTH,
typeSql, typeSql.length()));
}
if (hasKuduOptions()) {
Preconditions.checkNotNull(analyzer);
analyzeKuduOptions(analyzer);
}
if (comment_ != null &&
comment_.length() > MetaStoreUtil.CREATE_MAX_COMMENT_LENGTH) {
throw new AnalysisException(String.format(
@@ -121,6 +192,79 @@ public class ColumnDef {
}
}
private void analyzeKuduOptions(Analyzer analyzer) throws AnalysisException {
if (isPrimaryKey_ && isNullable_ != null && isNullable_) {
throw new AnalysisException("Primary key columns cannot be nullable: " +
toString());
}
// Encoding value
if (encodingVal_ != null) {
try {
encoding_ = Encoding.valueOf(encodingVal_);
} catch (IllegalArgumentException e) {
throw new AnalysisException(String.format("Unsupported encoding value '%s'. " +
"Supported encoding values are: %s", encodingVal_,
Joiner.on(", ").join(Encoding.values())));
}
}
// Compression algorithm
if (compressionVal_ != null) {
try {
compression_ = CompressionAlgorithm.valueOf(compressionVal_);
} catch (IllegalArgumentException e) {
throw new AnalysisException(String.format("Unsupported compression " +
"algorithm '%s'. Supported compression algorithms are: %s", compressionVal_,
Joiner.on(", ").join(CompressionAlgorithm.values())));
}
}
// Analyze the default value, if any.
// TODO: Similar checks are applied for range partition values in
// RangePartition.analyzeBoundaryValue(). Consider consolidating the logic into a
// single function.
if (defaultValue_ != null) {
try {
defaultValue_.analyze(analyzer);
} catch (AnalysisException e) {
throw new AnalysisException(String.format("Only constant values are allowed " +
"for default values: %s", defaultValue_.toSql()), e);
}
if (!defaultValue_.isConstant()) {
throw new AnalysisException(String.format("Only constant values are allowed " +
"for default values: %s", defaultValue_.toSql()));
}
defaultValue_ = LiteralExpr.create(defaultValue_, analyzer.getQueryCtx());
if (defaultValue_ == null) {
throw new AnalysisException(String.format("Only constant values are allowed " +
"for default values: %s", defaultValue_.toSql()));
}
if (defaultValue_.getType().isNull() && ((isNullable_ != null && !isNullable_)
|| isPrimaryKey_)) {
throw new AnalysisException(String.format("Default value of NULL not allowed " +
"on non-nullable column: '%s'", getColName()));
}
if (!Type.isImplicitlyCastable(defaultValue_.getType(), type_, true)) {
throw new AnalysisException(String.format("Default value %s (type: %s) " +
"is not compatible with column '%s' (type: %s).", defaultValue_.toSql(),
defaultValue_.getType().toSql(), colName_, type_.toSql()));
}
if (!defaultValue_.getType().equals(type_)) {
Expr castLiteral = defaultValue_.uncheckedCastTo(type_);
Preconditions.checkNotNull(castLiteral);
defaultValue_ = LiteralExpr.create(castLiteral, analyzer.getQueryCtx());
}
Preconditions.checkNotNull(defaultValue_);
}
// Analyze the block size value, if any.
if (blockSize_ != null) {
blockSize_.analyze(null);
if (!blockSize_.getType().isIntegerType()) {
throw new AnalysisException(String.format("Invalid value for BLOCK_SIZE: %s. " +
"A positive INTEGER value is expected.", blockSize_.toSql()));
}
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(colName_).append(" ");
@@ -130,6 +274,11 @@ public class ColumnDef {
sb.append(typeDef_);
}
if (isPrimaryKey_) sb.append(" PRIMARY KEY");
if (isNullable_ != null) sb.append(isNullable_ ? " NULL" : " NOT NULL");
if (encoding_ != null) sb.append(" ENCODING " + encoding_.toString());
if (compression_ != null) sb.append(" COMPRESSION " + compression_.toString());
if (defaultValue_ != null) sb.append(" DEFAULT_VALUE " + defaultValue_.toSql());
if (blockSize_ != null) sb.append(" BLOCK_SIZE " + blockSize_.toSql());
if (comment_ != null) sb.append(String.format(" COMMENT '%s'", comment_));
return sb.toString();
}
@@ -146,12 +295,21 @@ public class ColumnDef {
.append(isPrimaryKey_, rhs.isPrimaryKey_)
.append(typeDef_, rhs.typeDef_)
.append(type_, rhs.type_)
.append(isNullable_, rhs.isNullable_)
.append(encoding_, rhs.encoding_)
.append(compression_, rhs.compression_)
.append(defaultValue_, rhs.defaultValue_)
.append(blockSize_, rhs.blockSize_)
.isEquals();
}
public TColumn toThrift() {
TColumn col = new TColumn(new TColumn(getColName(), type_.toThrift()));
col.setComment(getComment());
TColumn col = new TColumn(getColName(), type_.toThrift());
Integer blockSize =
blockSize_ == null ? null : (int) ((NumericLiteral) blockSize_).getIntValue();
KuduUtil.setColumnOptions(col, isPrimaryKey_, isNullable_, encoding_,
compression_, defaultValue_, blockSize);
if (comment_ != null) col.setComment(comment_);
return col;
}

View File

@@ -18,6 +18,7 @@
package org.apache.impala.analysis;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -114,7 +115,7 @@ public abstract class CreateOrAlterViewStmtBase extends StatementBase {
List<String> labels = viewDefStmt_.getColLabels();
Preconditions.checkState(exprs.size() == labels.size());
for (int i = 0; i < viewDefStmt_.getColLabels().size(); ++i) {
ColumnDef colDef = new ColumnDef(labels.get(i), null, null);
ColumnDef colDef = new ColumnDef(labels.get(i), null);
colDef.setType(exprs.get(i).getType());
finalColDefs_.add(colDef);
}
@@ -124,7 +125,7 @@ public abstract class CreateOrAlterViewStmtBase extends StatementBase {
// duplicate column names.
Set<String> distinctColNames = Sets.newHashSet();
for (ColumnDef colDesc: finalColDefs_) {
colDesc.analyze();
colDesc.analyze(null);
if (!distinctColNames.add(colDesc.getColName().toLowerCase())) {
throw new AnalysisException("Duplicate column name: " + colDesc.getColName());
}

View File

@@ -17,8 +17,9 @@
package org.apache.impala.analysis;
import java.util.EnumSet;
import java.util.Collections;
import java.util.List;
import java.util.EnumSet;
import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.Db;
@@ -147,7 +148,7 @@ public class CreateTableAsSelectStmt extends StatementBase {
"mismatch: %s != %s", partitionLabel, colLabel));
}
ColumnDef colDef = new ColumnDef(colLabel, null, null);
ColumnDef colDef = new ColumnDef(colLabel, null);
colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType());
createStmt_.getPartitionColumnDefs().add(colDef);
}
@@ -159,8 +160,8 @@ public class CreateTableAsSelectStmt extends StatementBase {
int colCnt = tmpQueryStmt.getColLabels().size();
createStmt_.getColumnDefs().clear();
for (int i = 0; i < colCnt; ++i) {
ColumnDef colDef = new ColumnDef(
tmpQueryStmt.getColLabels().get(i), null, null);
ColumnDef colDef = new ColumnDef(tmpQueryStmt.getColLabels().get(i), null,
Collections.<ColumnDef.Option, Object>emptyMap());
colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType());
createStmt_.getColumnDefs().add(colDef);
}

View File

@@ -21,9 +21,11 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
@@ -36,7 +38,6 @@ import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.ArrayType;
import org.apache.impala.catalog.HdfsCompression;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.MapType;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.StructField;
@@ -328,8 +329,9 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
Type type = convertParquetType(field);
Preconditions.checkNotNull(type);
String colName = field.getName();
schema.add(new ColumnDef(colName, new TypeDef(type),
"Inferred from Parquet file."));
Map<ColumnDef.Option, Object> option = Maps.newHashMap();
option.put(ColumnDef.Option.COMMENT, "Inferred from Parquet file.");
schema.add(new ColumnDef(colName, new TypeDef(type), option));
}
return schema;
}

View File

@@ -27,6 +27,7 @@ import org.apache.impala.authorization.PrivilegeRequestBuilder;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.HBaseTable;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.Type;
@@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -126,18 +126,20 @@ public class InsertStmt extends StatementBase {
// Output expressions that produce the final results to write to the target table. May
// include casts. Set in prepareExpressions().
// If this is an INSERT, will contain one Expr for all non-partition columns of the
// target table with NullLiterals where an output column isn't explicitly mentioned.
// The i'th expr produces the i'th column of the target table.
// If this is an UPSERT, will contain one Expr per column mentioned in the query and
// mentionedUpsertColumns_ is used to map between the Exprs and columns in the target
// table.
// If this is an INSERT on a non-Kudu table, it will contain one Expr for all
// non-partition columns of the target table with NullLiterals where an output
// column isn't explicitly mentioned. The i'th expr produces the i'th column of
// the target table.
//
// For Kudu tables (INSERT and UPSERT operations), it will contain one Expr per column
// mentioned in the query and mentionedColumns_ is used to map between the Exprs
// and columns in the target table.
private ArrayList<Expr> resultExprs_ = Lists.newArrayList();
// Position mapping of exprs in resultExprs_ to columns in the target table -
// resultExprs_[i] produces the mentionedUpsertColumns_[i] column of the target table.
// Only used for UPSERT, set in prepareExpressions().
private final List<Integer> mentionedUpsertColumns_ = Lists.newArrayList();
// resultExprs_[i] produces the mentionedColumns_[i] column of the target table.
// Only used for Kudu tables, set in prepareExpressions().
private final List<Integer> mentionedColumns_ = Lists.newArrayList();
// Set in analyze(). Exprs corresponding to key columns of Kudu tables. Empty for
// non-Kudu tables.
@@ -209,7 +211,7 @@ public class InsertStmt extends StatementBase {
hasNoShuffleHint_ = false;
hasClusteredHint_ = false;
resultExprs_.clear();
mentionedUpsertColumns_.clear();
mentionedColumns_.clear();
primaryKeyExprs_.clear();
}
@@ -277,8 +279,9 @@ public class InsertStmt extends StatementBase {
// Finally, prepareExpressions analyzes the expressions themselves, and confirms that
// they are type-compatible with the target columns. Where columns are not mentioned
// (and by this point, we know that missing columns are not partition columns),
// prepareExpressions assigns them a NULL literal expressions, unless this is an
// UPSERT, in which case we don't want to overwrite unmentioned columns with NULL.
// prepareExpressions assigns them a NULL literal expressions, unless the target is
// a Kudu table, in which case we don't want to overwrite unmentioned columns with
// NULL.
// An null permutation clause is the same as listing all non-partition columns in
// order.
@@ -602,7 +605,7 @@ public class InsertStmt extends StatementBase {
*
* 3. Populates resultExprs_ with type-compatible expressions, in Hive column order,
* for all expressions in the select-list. Unmentioned columns are assigned NULL literal
* expressions, unless this is an UPSERT.
* expressions, unless the target is a Kudu table.
*
* 4. Result exprs for key columns of Kudu tables are stored in primaryKeyExprs_.
*
@@ -667,6 +670,7 @@ public class InsertStmt extends StatementBase {
expr.analyze(analyzer);
}
boolean isKuduTable = table_ instanceof KuduTable;
// Finally, 'undo' the permutation so that the selectListExprs are in Hive column
// order, and add NULL expressions to all missing columns, unless this is an UPSERT.
ArrayList<Column> columns = table_.getColumnsInHiveOrder();
@@ -676,23 +680,32 @@ public class InsertStmt extends StatementBase {
for (int i = 0; i < selectListExprs.size(); ++i) {
if (selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) {
resultExprs_.add(selectListExprs.get(i));
if (isUpsert_) mentionedUpsertColumns_.add(col);
if (isKuduTable) mentionedColumns_.add(col);
matchFound = true;
break;
}
}
// If no match is found, either the column is a clustering column with a static
// value, or it was unmentioned and therefore should have a NULL select-list
// expression if this is an INSERT.
// expression if this is an INSERT and the target is not a Kudu table.
if (!matchFound) {
if (tblColumn.getPosition() >= numClusteringCols && !isUpsert_) {
// Unmentioned non-clustering columns get NULL literals with the appropriate
// target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
resultExprs_.add(NullLiteral.create(tblColumn.getType()));
if (tblColumn.getPosition() >= numClusteringCols) {
if (isKuduTable) {
Preconditions.checkState(tblColumn instanceof KuduColumn);
KuduColumn kuduCol = (KuduColumn) tblColumn;
if (!kuduCol.hasDefaultValue() && !kuduCol.isNullable()) {
throw new AnalysisException("Missing values for column that is not " +
"nullable and has no default value " + kuduCol.getName());
}
} else {
// Unmentioned non-clustering columns get NULL literals with the appropriate
// target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
resultExprs_.add(NullLiteral.create(tblColumn.getType()));
}
}
}
// Store exprs for Kudu key columns.
if (matchFound && table_ instanceof KuduTable) {
if (matchFound && isKuduTable) {
KuduTable kuduTable = (KuduTable) table_;
if (kuduTable.isPrimaryKeyColumn(tblColumn.getName())) {
primaryKeyExprs_.add(Iterables.getLast(resultExprs_));
@@ -779,9 +792,8 @@ public class InsertStmt extends StatementBase {
public DataSink createDataSink() {
// analyze() must have been called before.
Preconditions.checkState(table_ != null);
Preconditions.checkState(isUpsert_ || mentionedUpsertColumns_.isEmpty());
return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT,
partitionKeyExprs_, mentionedUpsertColumns_, overwrite_);
partitionKeyExprs_, mentionedColumns_, overwrite_);
}
/**

View File

@@ -174,7 +174,7 @@ class TableDef {
Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
fqTableName_ = analyzer.getFqTableName(getTblName());
fqTableName_.analyze();
analyzeColumnDefs();
analyzeColumnDefs(analyzer);
analyzePrimaryKeys();
if (analyzer.dbContainsTable(getTblName().getDb(), getTbl(), Privilege.CREATE)
@@ -194,16 +194,20 @@ class TableDef {
* Analyzes table and partition column definitions, checking whether all column
* names are unique.
*/
private void analyzeColumnDefs() throws AnalysisException {
private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException {
Set<String> colNames = Sets.newHashSet();
for (ColumnDef colDef: columnDefs_) {
colDef.analyze();
colDef.analyze(analyzer);
if (!colNames.add(colDef.getColName().toLowerCase())) {
throw new AnalysisException("Duplicate column name: " + colDef.getColName());
}
if (getFileFormat() != THdfsFileFormat.KUDU && colDef.hasKuduOptions()) {
throw new AnalysisException(String.format("Unsupported column options for " +
"file format '%s': '%s'", getFileFormat().name(), colDef.toString()));
}
}
for (ColumnDef colDef: getPartitionColumnDefs()) {
colDef.analyze();
colDef.analyze(analyzer);
if (!colDef.getType().supportsTablePartitioning()) {
throw new AnalysisException(
String.format("Type '%s' is not supported as partition-column type " +
@@ -247,6 +251,10 @@ class TableDef {
throw new AnalysisException(String.format(
"PRIMARY KEY column '%s' does not exist in the table", colName));
}
if (colDef.isNullable()) {
throw new AnalysisException("Primary key columns cannot be nullable: " +
colDef.toString());
}
primaryKeyColDefs_.add(colDef);
}
}

View File

@@ -41,6 +41,7 @@ import org.apache.impala.catalog.Function;
import org.apache.impala.catalog.HBaseTable;
import org.apache.impala.catalog.HdfsCompression;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.KuduColumn;
import org.apache.impala.catalog.KuduTable;
import org.apache.impala.catalog.RowFormat;
import org.apache.impala.catalog.Table;
@@ -351,6 +352,21 @@ public class ToSqlUtils {
private static String columnToSql(Column col) {
StringBuilder sb = new StringBuilder(col.getName());
if (col.getType() != null) sb.append(" " + col.getType().toSql());
if (col instanceof KuduColumn) {
KuduColumn kuduCol = (KuduColumn) col;
Boolean isNullable = kuduCol.isNullable();
if (isNullable != null) sb.append(isNullable ? " NULL" : " NOT NULL");
if (kuduCol.getEncoding() != null) sb.append(" ENCODING " + kuduCol.getEncoding());
if (kuduCol.getCompression() != null) {
sb.append(" COMPRESSION " + kuduCol.getCompression());
}
if (kuduCol.getDefaultValue() != null) {
sb.append(" DEFAULT " + kuduCol.getDefaultValue().toSql());
}
if (kuduCol.getBlockSize() != 0) {
sb.append(String.format(" BLOCK_SIZE %d", kuduCol.getBlockSize()));
}
}
if (!Strings.isNullOrEmpty(col.getComment())) {
sb.append(String.format(" COMMENT '%s'", col.getComment()));
}

View File

@@ -31,6 +31,8 @@ import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.impala.common.ImpalaRuntimeException;
/**
* Internal representation of column-related metadata.
* Owned by Catalog instance.
@@ -84,7 +86,7 @@ public class Column {
.add("position_", position_).toString();
}
public static Column fromThrift(TColumn columnDesc) {
public static Column fromThrift(TColumn columnDesc) throws ImpalaRuntimeException {
String comment = columnDesc.isSetComment() ? columnDesc.getComment() : null;
Preconditions.checkState(columnDesc.isSetPosition());
int position = columnDesc.getPosition();
@@ -98,11 +100,7 @@ public class Column {
columnDesc.getColumn_qualifier(), columnDesc.isIs_binary(),
Type.fromThrift(columnDesc.getColumnType()), comment, position);
} else if (columnDesc.isIs_kudu_column()) {
Preconditions.checkState(columnDesc.isSetIs_key());
Preconditions.checkState(columnDesc.isSetIs_nullable());
col = new KuduColumn(columnDesc.getColumnName(), columnDesc.isIs_key(),
columnDesc.isIs_nullable(),
Type.fromThrift(columnDesc.getColumnType()), comment, position);
col = KuduColumn.fromThrift(columnDesc, position);
} else {
// Hdfs table column.
col = new Column(columnDesc.getColumnName(),

View File

@@ -17,36 +17,108 @@
package org.apache.impala.catalog;
import com.google.common.base.Preconditions;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.util.KuduUtil;
import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
import org.apache.kudu.ColumnSchema.Encoding;
import org.apache.kudu.ColumnSchema;
/**
* Describes a Kudu column mapped to a Hive column (as described in the metastore).
* This class extends Column with Kudu-specific information about whether it is part of a primary
* key, and whether it is nullable.
* Represents a Kudu column.
*
* This class extends Column with Kudu-specific information:
* - primary key
* - nullability constraint
* - encoding
* - compression
* - default value
* - desired block size
*/
public class KuduColumn extends Column {
private final boolean isKey_;
private final boolean isNullable_;
private final Encoding encoding_;
private final CompressionAlgorithm compression_;
private final LiteralExpr defaultValue_;
private final int blockSize_;
public KuduColumn(String name, boolean isKey, boolean isNullable, Type type,
String comment, int position) {
private KuduColumn(String name, Type type, boolean isKey, boolean isNullable,
Encoding encoding, CompressionAlgorithm compression, LiteralExpr defaultValue,
int blockSize, String comment, int position) {
super(name, type, comment, position);
isKey_ = isKey;
isNullable_ = isNullable;
encoding_ = encoding;
compression_ = compression;
defaultValue_ = defaultValue;
blockSize_ = blockSize;
}
public static KuduColumn fromColumnSchema(ColumnSchema colSchema, int position)
throws ImpalaRuntimeException {
Type type = KuduUtil.toImpalaType(colSchema.getType());
Object defaultValue = colSchema.getDefaultValue();
LiteralExpr defaultValueExpr = null;
if (defaultValue != null) {
try {
defaultValueExpr = LiteralExpr.create(defaultValue.toString(), type);
} catch (AnalysisException e) {
throw new ImpalaRuntimeException(String.format("Error parsing default value: " +
"'%s'", defaultValue), e);
}
Preconditions.checkNotNull(defaultValueExpr);
}
return new KuduColumn(colSchema.getName(), type, colSchema.isKey(),
colSchema.isNullable(), colSchema.getEncoding(),
colSchema.getCompressionAlgorithm(), defaultValueExpr,
colSchema.getDesiredBlockSize(), null, position);
}
public static KuduColumn fromThrift(TColumn column, int position)
throws ImpalaRuntimeException {
Preconditions.checkState(column.isSetIs_key());
Preconditions.checkState(column.isSetIs_nullable());
Type columnType = Type.fromThrift(column.getColumnType());
Encoding encoding = null;
if (column.isSetEncoding()) encoding = KuduUtil.fromThrift(column.getEncoding());
CompressionAlgorithm compression = null;
if (column.isSetCompression()) {
compression = KuduUtil.fromThrift(column.getCompression());
}
LiteralExpr defaultValue = null;
if (column.isSetDefault_value()) {
defaultValue =
LiteralExpr.fromThrift(column.getDefault_value().getNodes().get(0), columnType);
}
int blockSize = 0;
if (column.isSetBlock_size()) blockSize = column.getBlock_size();
return new KuduColumn(column.getColumnName(), columnType, column.isIs_key(),
column.isIs_nullable(), encoding, compression, defaultValue, blockSize, null,
position);
}
public boolean isKey() { return isKey_; }
public boolean isNullable() { return isNullable_; }
public Encoding getEncoding() { return encoding_; }
public CompressionAlgorithm getCompression() { return compression_; }
public LiteralExpr getDefaultValue() { return defaultValue_; }
public boolean hasDefaultValue() { return defaultValue_ != null; }
public int getBlockSize() { return blockSize_; }
@Override
public TColumn toThrift() {
TColumn colDesc = new TColumn(name_, type_.toThrift());
KuduUtil.setColumnOptions(colDesc, isKey_, isNullable_, encoding_, compression_,
defaultValue_, blockSize_);
if (comment_ != null) colDesc.setComment(comment_);
colDesc.setCol_stats(getStats().toThrift());
colDesc.setPosition(position_);
colDesc.setIs_kudu_column(true);
colDesc.setIs_key(isKey_);
colDesc.setIs_nullable(isNullable_);
return colDesc;
}
}

View File

@@ -215,12 +215,13 @@ public class KuduTable extends Table {
cols.clear();
int pos = 0;
for (ColumnSchema colSchema: kuduTable.getSchema().getColumns()) {
Type type = KuduUtil.toImpalaType(colSchema.getType());
String colName = colSchema.getName();
cols.add(new FieldSchema(colName, type.toSql().toLowerCase(), null));
boolean isKey = colSchema.isKey();
if (isKey) primaryKeyColumnNames_.add(colName);
addColumn(new KuduColumn(colName, isKey, !isKey, type, null, pos));
KuduColumn kuduCol = KuduColumn.fromColumnSchema(colSchema, pos);
Preconditions.checkNotNull(kuduCol);
// Add the HMS column
cols.add(new FieldSchema(kuduCol.getName(), kuduCol.getType().toSql().toLowerCase(),
null));
if (kuduCol.isKey()) primaryKeyColumnNames_.add(kuduCol.getName());
addColumn(kuduCol);
++pos;
}
}

View File

@@ -256,12 +256,17 @@ public abstract class Table implements CatalogObject {
colsByPos_.clear();
colsByPos_.ensureCapacity(columns.size());
for (int i = 0; i < columns.size(); ++i) {
Column col = Column.fromThrift(columns.get(i));
colsByPos_.add(col.getPosition(), col);
colsByName_.put(col.getName().toLowerCase(), col);
((StructType) type_.getItemType()).addField(
new StructField(col.getName(), col.getType(), col.getComment()));
try {
for (int i = 0; i < columns.size(); ++i) {
Column col = Column.fromThrift(columns.get(i));
colsByPos_.add(col.getPosition(), col);
colsByName_.put(col.getName().toLowerCase(), col);
((StructType) type_.getItemType()).addField(
new StructField(col.getName(), col.getType(), col.getComment()));
}
} catch (ImpalaRuntimeException e) {
throw new TableLoadingException(String.format("Error loading schema for " +
"table '%s'", getName()), e);
}
numClusteringCols_ = thriftTable.getClustering_columns().size();

View File

@@ -186,6 +186,8 @@ public class Planner {
Table targetTable = ctx_.getAnalysisResult().getInsertStmt().getTargetTable();
graph.addTargetColumnLabels(targetTable);
Preconditions.checkNotNull(targetTable);
// Lineage is not currently supported for Kudu tables (see IMPALA-4283)
if (targetTable instanceof KuduTable) return fragments;
List<Expr> exprs = Lists.newArrayList();
if (targetTable instanceof HBaseTable) {
exprs.addAll(resultExprs);

View File

@@ -31,6 +31,7 @@ import org.apache.impala.catalog.TableNotFoundException;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TCreateTableParams;
import org.apache.impala.thrift.TDistributeParam;
import org.apache.impala.thrift.TRangePartition;
@@ -74,7 +75,7 @@ public class KuduCatalogOpExecutor {
throw new ImpalaRuntimeException(String.format(
"Table '%s' already exists in Kudu.", kuduTableName));
}
Schema schema = createTableSchema(msTbl, params);
Schema schema = createTableSchema(params);
CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
kudu.createTable(kuduTableName, schema, tableOpts);
} catch (Exception e) {
@@ -86,22 +87,31 @@ public class KuduCatalogOpExecutor {
/**
* Creates the schema of a new Kudu table.
*/
private static Schema createTableSchema(
org.apache.hadoop.hive.metastore.api.Table msTbl, TCreateTableParams params)
private static Schema createTableSchema(TCreateTableParams params)
throws ImpalaRuntimeException {
Set<String> keyColNames = new HashSet<>(params.getPrimary_key_column_names());
List<FieldSchema> fieldSchemas = msTbl.getSd().getCols();
List<ColumnSchema> colSchemas = new ArrayList<>(fieldSchemas.size());
for (FieldSchema fieldSchema: fieldSchemas) {
Type type = Type.parseColumnType(fieldSchema.getType());
List<ColumnSchema> colSchemas = new ArrayList<>(params.getColumnsSize());
for (TColumn column: params.getColumns()) {
Type type = Type.fromThrift(column.getColumnType());
Preconditions.checkState(type != null);
org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type);
// Create the actual column and check if the column is a key column
ColumnSchemaBuilder csb =
new ColumnSchemaBuilder(fieldSchema.getName(), kuduType);
boolean isKeyCol = keyColNames.contains(fieldSchema.getName());
csb.key(isKeyCol);
csb.nullable(!isKeyCol);
new ColumnSchemaBuilder(column.getColumnName(), kuduType);
Preconditions.checkState(column.isSetIs_key());
csb.key(keyColNames.contains(column.getColumnName()));
if (column.isSetIs_nullable()) csb.nullable(column.isIs_nullable());
if (column.isSetDefault_value()) {
csb.defaultValue(KuduUtil.getKuduDefaultValue(column.getDefault_value(), kuduType,
column.getColumnName()));
}
if (column.isSetBlock_size()) csb.desiredBlockSize(column.getBlock_size());
if (column.isSetEncoding()) {
csb.encoding(KuduUtil.fromThrift(column.getEncoding()));
}
if (column.isSetCompression()) {
csb.compressionAlgorithm(KuduUtil.fromThrift(column.getCompression()));
}
colSchemas.add(csb.build());
}
return new Schema(colSchemas);

View File

@@ -29,10 +29,12 @@ import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.codehaus.jackson.JsonNode;
import org.apache.impala.analysis.ColumnDef;
import org.apache.impala.analysis.TypeDef;
import org.apache.impala.catalog.ArrayType;
@@ -42,8 +44,7 @@ import org.apache.impala.catalog.StructField;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.codehaus.jackson.JsonNode;
/**
* Utility class used to parse Avro schema. Checks that the schema is valid
@@ -81,9 +82,12 @@ public class AvroSchemaParser {
}
List<ColumnDef> colDefs = Lists.newArrayListWithCapacity(schema.getFields().size());
for (Schema.Field field: schema.getFields()) {
Map<ColumnDef.Option, Object> option = Maps.newHashMap();
String comment = field.doc();
if (comment != null) option.put(ColumnDef.Option.COMMENT, comment);
ColumnDef colDef = new ColumnDef(field.name(),
new TypeDef(getTypeInfo(field.schema(), field.name())), field.doc());
colDef.analyze();
new TypeDef(getTypeInfo(field.schema(), field.name())), option);
colDef.analyze(null);
colDefs.add(colDef);
}
return colDefs;
@@ -201,4 +205,4 @@ public class AvroSchemaParser {
}
return propValue;
}
}
}

View File

@@ -23,19 +23,19 @@ import java.net.URL;
import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.impala.analysis.ColumnDef;
import org.apache.impala.catalog.PrimitiveType;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.FileSystemUtil;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
/**
* Contains utility functions for dealing with Avro schemas.
*/
@@ -139,10 +139,13 @@ public class AvroSchemaUtils {
if ((colDef.getType().isStringType() && avroCol.getType().isStringType())) {
Preconditions.checkState(
avroCol.getType().getPrimitiveType() == PrimitiveType.STRING);
Map<ColumnDef.Option, Object> option = Maps.newHashMap();
String comment = avroCol.getComment();
if (comment != null) option.put(ColumnDef.Option.COMMENT, comment);
ColumnDef reconciledColDef = new ColumnDef(
avroCol.getColName(), colDef.getTypeDef(), avroCol.getComment());
avroCol.getColName(), colDef.getTypeDef(), option);
try {
reconciledColDef.analyze();
reconciledColDef.analyze(null);
} catch (AnalysisException e) {
Preconditions.checkNotNull(
null, "reconciledColDef.analyze() should never throw.");

View File

@@ -29,18 +29,26 @@ import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TExpr;
import org.apache.impala.thrift.TExprNode;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.analysis.Expr;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TColumnEncoding;
import org.apache.impala.thrift.THdfsCompression;
import com.google.common.base.Splitter;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnSchema.Encoding;
import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
import org.apache.kudu.Schema;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduClient.KuduClientBuilder;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RangePartitionBound;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
public class KuduUtil {
private static final String KUDU_TABLE_NAME_PREFIX = "impala::";
@@ -138,6 +146,145 @@ public class KuduUtil {
}
}
public static Object getKuduDefaultValue(TExpr defaultValue,
org.apache.kudu.Type type, String colName) throws ImpalaRuntimeException {
Preconditions.checkState(defaultValue.getNodes().size() == 1);
TExprNode literal = defaultValue.getNodes().get(0);
switch (type) {
case INT8:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
return (byte) literal.getInt_literal().getValue();
case INT16:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
return (short) literal.getInt_literal().getValue();
case INT32:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
return (int) literal.getInt_literal().getValue();
case INT64:
checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
return (long) literal.getInt_literal().getValue();
case FLOAT:
checkCorrectType(literal.isSetFloat_literal(), type, colName, literal);
return (float) literal.getFloat_literal().getValue();
case DOUBLE:
checkCorrectType(literal.isSetFloat_literal(), type, colName, literal);
return (double) literal.getFloat_literal().getValue();
case STRING:
checkCorrectType(literal.isSetString_literal(), type, colName, literal);
return literal.getString_literal().getValue();
default:
throw new ImpalaRuntimeException("Unsupported value for column type: " +
type.toString());
}
}
public static Encoding fromThrift(TColumnEncoding encoding)
throws ImpalaRuntimeException {
switch (encoding) {
case AUTO:
return Encoding.AUTO_ENCODING;
case PLAIN:
return Encoding.PLAIN_ENCODING;
case PREFIX:
return Encoding.PREFIX_ENCODING;
case GROUP_VARINT:
return Encoding.GROUP_VARINT;
case RLE:
return Encoding.RLE;
case DICTIONARY:
return Encoding.DICT_ENCODING;
case BIT_SHUFFLE:
return Encoding.BIT_SHUFFLE;
default:
throw new ImpalaRuntimeException("Unsupported encoding: " +
encoding.toString());
}
}
public static TColumnEncoding toThrift(Encoding encoding)
throws ImpalaRuntimeException {
switch (encoding) {
case AUTO_ENCODING:
return TColumnEncoding.AUTO;
case PLAIN_ENCODING:
return TColumnEncoding.PLAIN;
case PREFIX_ENCODING:
return TColumnEncoding.PREFIX;
case GROUP_VARINT:
return TColumnEncoding.GROUP_VARINT;
case RLE:
return TColumnEncoding.RLE;
case DICT_ENCODING:
return TColumnEncoding.DICTIONARY;
case BIT_SHUFFLE:
return TColumnEncoding.BIT_SHUFFLE;
default:
throw new ImpalaRuntimeException("Unsupported encoding: " +
encoding.toString());
}
}
public static CompressionAlgorithm fromThrift(THdfsCompression compression)
throws ImpalaRuntimeException {
switch (compression) {
case DEFAULT:
return CompressionAlgorithm.DEFAULT_COMPRESSION;
case NONE:
return CompressionAlgorithm.NO_COMPRESSION;
case SNAPPY:
return CompressionAlgorithm.SNAPPY;
case LZ4:
return CompressionAlgorithm.LZ4;
case ZLIB:
return CompressionAlgorithm.ZLIB;
default:
throw new ImpalaRuntimeException("Unsupported compression algorithm: " +
compression.toString());
}
}
public static THdfsCompression toThrift(CompressionAlgorithm compression)
throws ImpalaRuntimeException {
switch (compression) {
case NO_COMPRESSION:
return THdfsCompression.NONE;
case DEFAULT_COMPRESSION:
return THdfsCompression.DEFAULT;
case SNAPPY:
return THdfsCompression.SNAPPY;
case LZ4:
return THdfsCompression.LZ4;
case ZLIB:
return THdfsCompression.ZLIB;
default:
throw new ImpalaRuntimeException("Unsupported compression algorithm: " +
compression.toString());
}
}
public static TColumn setColumnOptions(TColumn column, boolean isKey,
Boolean isNullable, Encoding encoding, CompressionAlgorithm compression,
Expr defaultValue, Integer blockSize) {
column.setIs_key(isKey);
if (isNullable != null) column.setIs_nullable(isNullable);
try {
if (encoding != null) column.setEncoding(toThrift(encoding));
if (compression != null) column.setCompression(toThrift(compression));
} catch (ImpalaRuntimeException e) {
// This shouldn't happen
throw new IllegalStateException(String.format("Error parsing " +
"encoding/compression values for Kudu column '%s': %s", column.getColumnName(),
e.getMessage()));
}
if (defaultValue != null) {
Preconditions.checkState(defaultValue instanceof LiteralExpr);
column.setDefault_value(defaultValue.treeToThrift());
}
if (blockSize != null) column.setBlock_size(blockSize);
return column;
}
/**
* If correctType is true, returns. Otherwise throws a formatted error message
* indicating problems with the type of the literal of the range literal.

View File

@@ -68,6 +68,7 @@ import org.apache.impala.analysis.SqlParserSymbols;
keywordMap.put("between", new Integer(SqlParserSymbols.KW_BETWEEN));
keywordMap.put("bigint", new Integer(SqlParserSymbols.KW_BIGINT));
keywordMap.put("binary", new Integer(SqlParserSymbols.KW_BINARY));
keywordMap.put("block_size", new Integer(SqlParserSymbols.KW_BLOCKSIZE));
keywordMap.put("boolean", new Integer(SqlParserSymbols.KW_BOOLEAN));
keywordMap.put("buckets", new Integer(SqlParserSymbols.KW_BUCKETS));
keywordMap.put("by", new Integer(SqlParserSymbols.KW_BY));
@@ -82,6 +83,7 @@ import org.apache.impala.analysis.SqlParserSymbols;
keywordMap.put("column", new Integer(SqlParserSymbols.KW_COLUMN));
keywordMap.put("columns", new Integer(SqlParserSymbols.KW_COLUMNS));
keywordMap.put("comment", new Integer(SqlParserSymbols.KW_COMMENT));
keywordMap.put("compression", new Integer(SqlParserSymbols.KW_COMPRESSION));
keywordMap.put("compute", new Integer(SqlParserSymbols.KW_COMPUTE));
keywordMap.put("create", new Integer(SqlParserSymbols.KW_CREATE));
keywordMap.put("cross", new Integer(SqlParserSymbols.KW_CROSS));
@@ -92,6 +94,7 @@ import org.apache.impala.analysis.SqlParserSymbols;
keywordMap.put("date", new Integer(SqlParserSymbols.KW_DATE));
keywordMap.put("datetime", new Integer(SqlParserSymbols.KW_DATETIME));
keywordMap.put("decimal", new Integer(SqlParserSymbols.KW_DECIMAL));
keywordMap.put("default", new Integer(SqlParserSymbols.KW_DEFAULT));
keywordMap.put("delete", new Integer(SqlParserSymbols.KW_DELETE));
keywordMap.put("delimited", new Integer(SqlParserSymbols.KW_DELIMITED));
keywordMap.put("desc", new Integer(SqlParserSymbols.KW_DESC));
@@ -102,6 +105,7 @@ import org.apache.impala.analysis.SqlParserSymbols;
keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE));
keywordMap.put("drop", new Integer(SqlParserSymbols.KW_DROP));
keywordMap.put("else", new Integer(SqlParserSymbols.KW_ELSE));
keywordMap.put("encoding", new Integer(SqlParserSymbols.KW_ENCODING));
keywordMap.put("end", new Integer(SqlParserSymbols.KW_END));
keywordMap.put("escaped", new Integer(SqlParserSymbols.KW_ESCAPED));
keywordMap.put("exists", new Integer(SqlParserSymbols.KW_EXISTS));

View File

@@ -48,6 +48,8 @@ import org.apache.impala.common.FrontendTestBase;
import org.apache.impala.common.RuntimeEnv;
import org.apache.impala.testutil.TestUtils;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
import org.apache.kudu.ColumnSchema.Encoding;
import org.junit.Test;
import com.google.common.base.Joiner;
@@ -1615,6 +1617,9 @@ public class AnalyzeDDLTest extends FrontendTestBase {
// Supported file formats. Exclude Avro since it is tested separately.
String [] fileFormats =
{"TEXTFILE", "SEQUENCEFILE", "PARQUET", "PARQUETFILE", "RCFILE"};
String [] fileFormatsStr =
{"TEXT", "SEQUENCE_FILE", "PARQUET", "PARQUET", "RC_FILE"};
int formatIndx = 0;
for (String format: fileFormats) {
for (String create: ImmutableList.of("create table", "create external table")) {
AnalyzesOk(String.format("%s new_table (i int) " +
@@ -1625,9 +1630,11 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"Table requires at least 1 column");
}
AnalysisError(String.format("create table t (i int primary key) stored as %s",
format), "Only Kudu tables can specify a PRIMARY KEY");
format), String.format("Unsupported column options for file format " +
"'%s': 'i INT PRIMARY KEY'", fileFormatsStr[formatIndx]));
AnalysisError(String.format("create table t (i int, primary key(i)) stored as %s",
format), "Only Kudu tables can specify a PRIMARY KEY");
formatIndx++;
}
// Note: Backslashes need to be escaped twice - once for Java and once for Impala.
@@ -1986,7 +1993,7 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"partition 10 < values <= 30, partition 30 < values) " +
"stored as kudu tblproperties('kudu.master_addresses'='127.0.0.1')");
// Not using the STORED AS KUDU syntax to specify a Kudu table
AnalysisError("create table tab (x int primary key) tblproperties (" +
AnalysisError("create table tab (x int) tblproperties (" +
"'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler')",
CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE);
AnalysisError("create table tab (x int primary key) stored as kudu tblproperties (" +
@@ -2035,6 +2042,86 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"distribute by hash(x) into 3 buckets stored as kudu", t);
AnalysisError(stmt, expectedError);
}
// Test column options
String[] nullability = {"not null", "null", ""};
String[] defaultVal = {"default 10", ""};
String[] blockSize = {"block_size 4096", ""};
for (Encoding enc: Encoding.values()) {
for (CompressionAlgorithm comp: CompressionAlgorithm.values()) {
for (String nul: nullability) {
for (String def: defaultVal) {
for (String block: blockSize) {
AnalyzesOk(String.format("create table tab (x int primary key " +
"not null encoding %s compression %s %s %s, y int encoding %s " +
"compression %s %s %s %s) distribute by hash (x) " +
"into 3 buckets stored as kudu", enc, comp, def, block, enc,
comp, def, nul, block));
}
}
}
}
}
// Primary key specified using the PRIMARY KEY clause
AnalyzesOk("create table tab (x int not null encoding plain_encoding " +
"compression snappy block_size 1, y int null encoding rle compression lz4 " +
"default 1, primary key(x)) distribute by hash (x) into 3 buckets " +
"stored as kudu");
// Primary keys can't be null
AnalysisError("create table tab (x int primary key null, y int not null) " +
"distribute by hash (x) into 3 buckets stored as kudu", "Primary key columns " +
"cannot be nullable: x INT PRIMARY KEY NULL");
AnalysisError("create table tab (x int not null, y int null, primary key (x, y)) " +
"distribute by hash (x) into 3 buckets stored as kudu", "Primary key columns " +
"cannot be nullable: y INT NULL");
// Unsupported encoding value
AnalysisError("create table tab (x int primary key, y int encoding invalid_enc) " +
"distribute by hash (x) into 3 buckets stored as kudu", "Unsupported encoding " +
"value 'INVALID_ENC'. Supported encoding values are: " +
Joiner.on(", ").join(Encoding.values()));
// Unsupported compression algorithm
AnalysisError("create table tab (x int primary key, y int compression " +
"invalid_comp) distribute by hash (x) into 3 buckets stored as kudu",
"Unsupported compression algorithm 'INVALID_COMP'. Supported compression " +
"algorithms are: " + Joiner.on(", ").join(CompressionAlgorithm.values()));
// Default values
AnalyzesOk("create table tab (i1 tinyint default 1, i2 smallint default 10, " +
"i3 int default 100, i4 bigint default 1000, vals string default 'test', " +
"valf float default cast(1.2 as float), vald double default " +
"cast(3.1452 as double), valb boolean default true, " +
"primary key (i1, i2, i3, i4, vals)) distribute by hash (i1) into 3 " +
"buckets stored as kudu");
AnalyzesOk("create table tab (i int primary key default 1+1+1) " +
"distribute by hash (i) into 3 buckets stored as kudu");
AnalyzesOk("create table tab (i int primary key default factorial(5)) " +
"distribute by hash (i) into 3 buckets stored as kudu");
AnalyzesOk("create table tab (i int primary key, x int null default " +
"isnull(null, null)) distribute by hash (i) into 3 buckets stored as kudu");
// Invalid default values
AnalysisError("create table tab (i int primary key default 'string_val') " +
"distribute by hash (i) into 3 buckets stored as kudu", "Default value " +
"'string_val' (type: STRING) is not compatible with column 'i' (type: INT).");
AnalysisError("create table tab (i int primary key, x int default 1.1) " +
"distribute by hash (i) into 3 buckets stored as kudu",
"Default value 1.1 (type: DECIMAL(2,1)) is not compatible with column " +
"'x' (type: INT).");
AnalysisError("create table tab (i tinyint primary key default 128) " +
"distribute by hash (i) into 3 buckets stored as kudu", "Default value " +
"128 (type: SMALLINT) is not compatible with column 'i' (type: TINYINT).");
AnalysisError("create table tab (i int primary key default isnull(null, null)) " +
"distribute by hash (i) into 3 buckets stored as kudu", "Default value of " +
"NULL not allowed on non-nullable column: 'i'");
AnalysisError("create table tab (i int primary key, x int not null " +
"default isnull(null, null)) distribute by hash (i) into 3 buckets " +
"stored as kudu", "Default value of NULL not allowed on non-nullable column: " +
"'x'");
// Invalid block_size values
AnalysisError("create table tab (i int primary key block_size 1.1) " +
"distribute by hash (i) into 3 buckets stored as kudu", "Invalid value " +
"for BLOCK_SIZE: 1.1. A positive INTEGER value is expected.");
AnalysisError("create table tab (i int primary key block_size 'val') " +
"distribute by hash (i) into 3 buckets stored as kudu", "Invalid value " +
"for BLOCK_SIZE: 'val'. A positive INTEGER value is expected.");
}
@Test
@@ -2279,11 +2366,12 @@ public class AnalyzeDDLTest extends FrontendTestBase {
"Type 'STRUCT<f1:INT>' is not supported as partition-column type in column: x");
// Kudu specific clauses used in an Avro table.
AnalysisError("create table functional.new_table (i int primary key) " +
AnalysisError("create table functional.new_table (i int) " +
"distribute by hash(i) into 3 buckets stored as avro",
"Only Kudu tables can use the DISTRIBUTE BY clause.");
AnalysisError("create table functional.new_table (i int primary key) " +
"stored as avro", "Only Kudu tables can specify a PRIMARY KEY.");
"stored as avro", "Unsupported column options for file format 'AVRO': " +
"'i INT PRIMARY KEY'");
}
@Test

View File

@@ -929,8 +929,8 @@ public class ParserTest extends FrontendTestBase {
@Test
public void TestIdentQuoting() {
ParsesOk("select a from `t`");
ParsesOk("select a from `default`.`t`");
ParsesOk("select a from `default`.t");
ParsesOk("select a from default.`t`");
ParsesOk("select a from default.t");
ParsesOk("select a from default.`t`");
ParsesOk("select 01a from default.`01_t`");
@@ -962,7 +962,7 @@ public class ParserTest extends FrontendTestBase {
// Quoted identifiers can contain any characters except "`".
ParsesOk("select a from `all types`");
ParsesOk("select a from `default`.`all types`");
ParsesOk("select a from default.`all types`");
ParsesOk("select a from `~!@#$%^&*()-_=+|;:'\",<.>/?`");
// Quoted identifiers do not unescape escape sequences.
ParsesOk("select a from `ab\rabc`");
@@ -1676,7 +1676,7 @@ public class ParserTest extends FrontendTestBase {
@Test
public void TestKuduUpdate() {
TestUtils.assumeKuduIsSupported();
//TestUtils.assumeKuduIsSupported();
ParserError("update (select * from functional_kudu.testtbl) a set name = '10'");
}
@@ -2456,6 +2456,51 @@ public class ParserTest extends FrontendTestBase {
"(PARTITION VALUES = 10) STORED AS KUDU");
ParserError("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE (a) " +
"(PARTITION 10 < VALUE < 20) STORED AS KUDU");
// Column options for Kudu tables
String[] encodings = {"encoding auto_encoding", "encoding plain_encoding",
"encoding prefix_encoding", "encoding group_varint", "encoding rle",
"encoding dict_encoding", "encoding bit_shuffle", "encoding unknown", ""};
String[] compression = {"compression default_compression",
"compression no_compression", "compression snappy", "compression lz4",
"compression zlib", "compression unknown", ""};
String[] nullability = {"not null", "null", ""};
String[] defaultVal = {"default 10", ""};
String[] blockSize = {"block_size 4096", ""};
for (String enc: encodings) {
for (String comp: compression) {
for (String nul: nullability) {
for (String def: defaultVal) {
for (String block: blockSize) {
ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " +
"%s %s %s %s %s) STORED AS KUDU", nul, enc, comp, def, block));
ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " +
"%s %s %s %s %s) STORED AS KUDU", block, nul, enc, comp, def));
ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " +
"%s %s %s %s %s) STORED AS KUDU", def, block, nul, enc, comp));
ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " +
"%s %s %s %s %s) STORED AS KUDU", comp, def, block, nul, enc));
ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " +
"%s %s %s %s %s) STORED AS KUDU", enc, comp, def, block, nul));
ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " +
"%s %s %s %s %s) STORED AS KUDU", enc, comp, block, def, nul));
}
}
}
}
}
// Column option is specified multiple times for the same column
ParserError("CREATE TABLE Foo(a int PRIMARY KEY ENCODING RLE ENCODING PLAIN) " +
"STORED AS KUDU");
// Constant expr used in DEFAULT
ParsesOk("CREATE TABLE Foo(a int PRIMARY KEY, b int DEFAULT 1+1) STORED AS KUDU");
ParsesOk("CREATE TABLE Foo(a int PRIMARY KEY, b float DEFAULT cast(1.1 as float)) " +
"STORED AS KUDU");
// Non-literal value used in BLOCK_SIZE
ParserError("CREATE TABLE Foo(a int PRIMARY KEY, b int BLOCK_SIZE 1+1) " +
"STORED AS KUDU");
ParserError("CREATE TABLE Foo(a int PRIMARY KEY BLOCK_SIZE -1) STORED AS KUDU");
}
@Test
@@ -2886,7 +2931,7 @@ public class ParserTest extends FrontendTestBase {
"select from t\n" +
" ^\n" +
"Encountered: FROM\n" +
"Expected: ALL, CASE, CAST, DISTINCT, EXISTS, " +
"Expected: ALL, CASE, CAST, DEFAULT, DISTINCT, EXISTS, " +
"FALSE, IF, INTERVAL, NOT, NULL, " +
"STRAIGHT_JOIN, TRUNCATE, TRUE, IDENTIFIER\n");
@@ -2896,8 +2941,8 @@ public class ParserTest extends FrontendTestBase {
"select c, b, c where a = 5\n" +
" ^\n" +
"Encountered: WHERE\n" +
"Expected: AND, AS, BETWEEN, DIV, FROM, ILIKE, IN, IREGEXP, IS, LIKE, LIMIT, NOT, OR, " +
"ORDER, REGEXP, RLIKE, UNION, COMMA, IDENTIFIER\n");
"Expected: AND, AS, BETWEEN, DEFAULT, DIV, FROM, ILIKE, IN, IREGEXP, IS, LIKE, " +
"LIMIT, NOT, OR, ORDER, REGEXP, RLIKE, UNION, COMMA, IDENTIFIER\n");
// missing table list
ParserError("select c, b, c from where a = 5",
@@ -2905,7 +2950,7 @@ public class ParserTest extends FrontendTestBase {
"select c, b, c from where a = 5\n" +
" ^\n" +
"Encountered: WHERE\n" +
"Expected: IDENTIFIER\n");
"Expected: DEFAULT, IDENTIFIER\n");
// missing predicate in where clause (no group by)
ParserError("select c, b, c from t where",
@@ -2913,7 +2958,7 @@ public class ParserTest extends FrontendTestBase {
"select c, b, c from t where\n" +
" ^\n" +
"Encountered: EOF\n" +
"Expected: CASE, CAST, EXISTS, FALSE, " +
"Expected: CASE, CAST, DEFAULT, EXISTS, FALSE, " +
"IF, INTERVAL, NOT, NULL, TRUNCATE, TRUE, IDENTIFIER\n");
// missing predicate in where clause (group by)
@@ -2922,7 +2967,7 @@ public class ParserTest extends FrontendTestBase {
"select c, b, c from t where group by a, b\n" +
" ^\n" +
"Encountered: GROUP\n" +
"Expected: CASE, CAST, EXISTS, FALSE, " +
"Expected: CASE, CAST, DEFAULT, EXISTS, FALSE, " +
"IF, INTERVAL, NOT, NULL, TRUNCATE, TRUE, IDENTIFIER\n");
// unmatched string literal starting with "
@@ -2983,7 +3028,7 @@ public class ParserTest extends FrontendTestBase {
"...c,c,c,c,c,c,c,c,cd,c,d,d, ,c, from t\n" +
" ^\n" +
"Encountered: COMMA\n" +
"Expected: CASE, CAST, EXISTS, FALSE, " +
"Expected: CASE, CAST, DEFAULT, EXISTS, FALSE, " +
"IF, INTERVAL, NOT, NULL, TRUNCATE, TRUE, IDENTIFIER\n");
// Parsing identifiers that have different names printed as EXPECTED
@@ -3004,7 +3049,7 @@ public class ParserTest extends FrontendTestBase {
"USE ` `\n" +
" ^\n" +
"Encountered: EMPTY IDENTIFIER\n" +
"Expected: IDENTIFIER\n");
"Expected: DEFAULT, IDENTIFIER\n");
// Expecting = token
ParserError("SET foo",

View File

@@ -551,20 +551,20 @@ DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}_idx;
CREATE TABLE {db_name}{db_suffix}.{table_name}_idx (
kudu_idx BIGINT PRIMARY KEY,
id INT,
bool_col BOOLEAN,
tinyint_col TINYINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_string_col STRING,
string_col STRING,
timestamp_col STRING,
year INT,
month INT,
day INT
id INT NULL,
bool_col BOOLEAN NULL,
tinyint_col TINYINT NULL,
smallint_col SMALLINT NULL,
int_col INT NULL,
bigint_col BIGINT NULL,
float_col FLOAT NULL,
double_col DOUBLE NULL,
date_string_col STRING NULL,
string_col STRING NULL,
timestamp_col STRING NULL,
year INT NULL,
month INT NULL,
day INT NULL
)
DISTRIBUTE BY HASH (kudu_idx) INTO 3 BUCKETS STORED AS KUDU;
CREATE VIEW {db_name}{db_suffix}.{table_name} AS
@@ -763,8 +763,8 @@ delimited fields terminated by ',' escaped by '\\'
DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name};
create table {db_name}{db_suffix}.{table_name} (
id bigint primary key,
name string,
zip int
name string null,
zip int null
)
distribute by range(id) (partition values <= 1003, partition 1003 < values <= 1007,
partition 1007 < values) stored as kudu;
@@ -1310,7 +1310,8 @@ OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
---- CREATE_KUDU
DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name};
create table {db_name}{db_suffix}.{table_name} (
a string primary key, b string, c string, d int, e double, f string, g string
a string primary key, b string null, c string null, d int null, e double null,
f string null, g string null
)
distribute by hash(a) into 3 buckets stored as kudu;
====
@@ -1412,10 +1413,10 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/ImpalaDemoDataset/DEC_00_SF3_P077
DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name};
create table {db_name}{db_suffix}.{table_name} (
id string primary key,
zip string,
description1 string,
description2 string,
income int)
zip string null,
description1 string null,
description2 string null,
income int null)
distribute by range(id)
(partition values <= '8600000US01475',
partition '8600000US01475' < values <= '8600000US63121',

View File

@@ -1,8 +1,8 @@
====
---- QUERY
create table tdata
(id int primary key, valf float, vali bigint, valv string, valb boolean, valt tinyint,
vals smallint, vald double)
(id int primary key, valf float null, vali bigint null, valv string null,
valb boolean null, valt tinyint null, vals smallint null, vald double null)
DISTRIBUTE BY RANGE (PARTITION VALUES < 100, PARTITION 100 <= VALUES < 1000,
PARTITION 1000 <= VALUES <= 10000) STORED AS KUDU
---- RESULTS
@@ -297,8 +297,9 @@ INT,FLOAT,BIGINT,STRING,BOOLEAN,TINYINT,SMALLINT,DOUBLE
====
---- QUERY
create table multiple_key_cols
(string_col string, bigint_col bigint, tinyint_col tinyint, smallint_col smallint,
bool_col boolean, int_col int, double_col double, float_col float,
(string_col string, bigint_col bigint, tinyint_col tinyint,
smallint_col smallint, bool_col boolean null, int_col int null,
double_col double null, float_col float null,
primary key (string_col, bigint_col, tinyint_col, smallint_col))
DISTRIBUTE BY HASH (string_col) INTO 16 BUCKETS STORED AS KUDU
====

View File

@@ -1,8 +1,8 @@
====
---- QUERY
create table tdata
(id int primary key, valf float, vali bigint, valv string, valb boolean, valt tinyint,
vals smallint, vald double)
(id int primary key, valf float null, vali bigint null, valv string null,
valb boolean null, valt tinyint null, vals smallint null, vald double null)
DISTRIBUTE BY RANGE (PARTITION VALUES < 10, PARTITION 10 <= VALUES < 30,
PARTITION 30 <= VALUES) STORED AS KUDU
---- RESULTS

View File

@@ -1,8 +1,9 @@
====
---- QUERY
create table tdata
(id int primary key, name string, valf float, vali bigint, valv string, valb boolean,
valt tinyint, vals smallint, vald double)
(id int primary key, name string null, valf float null, vali bigint null,
valv string null, valb boolean null, valt tinyint null, vals smallint null,
vald double null)
DISTRIBUTE BY RANGE (PARTITION VALUES < 10, PARTITION 10 <= VALUES < 30,
PARTITION 30 <= VALUES <= 10000) STORED AS KUDU
---- RESULTS
@@ -337,4 +338,4 @@ update tdata set vali = -1
---- RUNTIME_PROFILE
NumModifiedRows: 7300
NumRowErrors: 0
====
====

View File

@@ -1,8 +1,9 @@
====
---- QUERY
create table tdata
(id int primary key, name string, valf float, vali bigint, valv string, valb boolean,
valt tinyint, vals smallint, vald double)
(id int primary key, name string null, valf float null, vali bigint null,
valv string null, valb boolean null, valt tinyint null, vals smallint null,
vald double null)
DISTRIBUTE BY RANGE (PARTITION VALUES < 10, PARTITION 10 <= VALUES < 30,
PARTITION 30 <= VALUES) STORED AS KUDU
---- RESULTS
@@ -389,8 +390,8 @@ NumRowErrors: 0
---- QUERY
create table multiple_key_cols
(string_col string, bigint_col bigint, tinyint_col tinyint, smallint_col smallint,
bool_col boolean, int_col int, double_col double, float_col float,
primary key (string_col, bigint_col, tinyint_col, smallint_col))
bool_col boolean null, int_col int null, double_col double null,
float_col float null, primary key (string_col, bigint_col, tinyint_col, smallint_col))
DISTRIBUTE BY HASH (string_col) INTO 16 BUCKETS STORED AS KUDU
====
---- QUERY
@@ -488,4 +489,4 @@ upsert into table multiple_key_cols
(string_col, tinyint_col, smallint_col) values ('a', 1, 1)
---- CATCH
All primary key columns must be specified for UPSERTing into Kudu tables. Missing columns are: bigint_col
====
====

View File

@@ -64,6 +64,27 @@ class TestKuduOperations(KuduTestSuite):
def test_kudu_stats(self, vector, unique_database):
self.run_test_case('QueryTest/kudu_stats', vector, use_db=unique_database)
def test_kudu_column_options(self, cursor, kudu_client, unique_database):
encodings = ["ENCODING PLAIN_ENCODING", ""]
compressions = ["COMPRESSION SNAPPY", ""]
nullability = ["NOT NULL", "NULL", ""]
defaults = ["DEFAULT 1", ""]
blocksizes = ["BLOCK_SIZE 32768", ""]
indx = 1
for encoding in encodings:
for compression in compressions:
for default in defaults:
for blocksize in blocksizes:
for nullable in nullability:
impala_tbl_name = "test_column_options_%s" % str(indx)
cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY
%s %s %s %s, b INT %s %s %s %s %s) DISTRIBUTE BY HASH (a) INTO 3
BUCKETS STORED AS KUDU""" % (unique_database, impala_tbl_name,
encoding, compression, default, blocksize, nullable, encoding,
compression, default, blocksize))
indx = indx + 1
kudu_tbl_name = "impala::%s.%s" % (unique_database, impala_tbl_name)
assert kudu_client.table_exists(kudu_tbl_name)
class TestCreateExternalTable(KuduTestSuite):
@@ -228,13 +249,14 @@ class TestShowCreateTable(KuduTestSuite):
def test_primary_key_and_distribution(self, cursor):
# TODO: Add test cases with column comments once KUDU-1711 is fixed.
# TODO: Add case with BLOCK_SIZE
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT PRIMARY KEY)
DISTRIBUTE BY HASH (c) INTO 3 BUCKETS STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT,
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c)
)
DISTRIBUTE BY HASH (c) INTO 3 BUCKETS
@@ -243,14 +265,14 @@ class TestShowCreateTable(KuduTestSuite):
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT PRIMARY KEY, d STRING)
CREATE TABLE {table} (c INT PRIMARY KEY, d STRING NULL)
DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c)
(PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2,
PARTITION 2 < VALUES) STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT,
d STRING,
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
d STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c)
)
DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) (...)
@@ -259,11 +281,11 @@ class TestShowCreateTable(KuduTestSuite):
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT, PRIMARY KEY (c))
CREATE TABLE {table} (c INT ENCODING PLAIN_ENCODING, PRIMARY KEY (c))
DISTRIBUTE BY HASH (c) INTO 3 BUCKETS STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT,
c INT NOT NULL ENCODING PLAIN_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c)
)
DISTRIBUTE BY HASH (c) INTO 3 BUCKETS
@@ -272,14 +294,14 @@ class TestShowCreateTable(KuduTestSuite):
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT, d STRING, PRIMARY KEY(c, d))
CREATE TABLE {table} (c INT COMPRESSION LZ4, d STRING, PRIMARY KEY(c, d))
DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS,
RANGE (c, d) (PARTITION VALUE = (1, 'aaa'), PARTITION VALUE = (2, 'bbb'))
STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT,
d STRING,
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4,
d STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c, d)
)
DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS, RANGE (c, d) (...)
@@ -288,14 +310,14 @@ class TestShowCreateTable(KuduTestSuite):
db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
self.assert_show_create_equals(cursor,
"""
CREATE TABLE {table} (c INT, d STRING, e INT, PRIMARY KEY(c, d))
CREATE TABLE {table} (c INT, d STRING, e INT NULL DEFAULT 10, PRIMARY KEY(c, d))
DISTRIBUTE BY RANGE (c) (PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2,
PARTITION 2 < VALUES <= 3, PARTITION 3 < VALUES) STORED AS KUDU""",
"""
CREATE TABLE {db}.{{table}} (
c INT,
d STRING,
e INT,
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
d STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
e INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION DEFAULT 10,
PRIMARY KEY (c, d)
)
DISTRIBUTE BY RANGE (c) (...)
@@ -316,7 +338,7 @@ class TestShowCreateTable(KuduTestSuite):
TBLPROPERTIES ({props})""".format(props=props),
"""
CREATE TABLE {db}.{{table}} (
c INT,
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c)
)
DISTRIBUTE BY HASH (c) INTO 3 BUCKETS
@@ -335,7 +357,7 @@ class TestShowCreateTable(KuduTestSuite):
TBLPROPERTIES ({props})""".format(props=props),
"""
CREATE TABLE {db}.{{table}} (
c INT,
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
PRIMARY KEY (c)
)
DISTRIBUTE BY HASH (c) INTO 3 BUCKETS

View File

@@ -478,7 +478,7 @@ class TestImpalaShell(ImpalaTestSuite):
def test_kudu_dml_reporting(self, unique_database):
db = unique_database
run_impala_shell_cmd('--query="create table %s.dml_test (id int primary key, '\
'age int) distribute by hash(id) into 2 buckets stored as kudu"' % db)
'age int null) distribute by hash(id) into 2 buckets stored as kudu"' % db)
self._validate_dml_stmt("insert into %s.dml_test (id) values (7), (7)" % db, 1, 1)
self._validate_dml_stmt("insert into %s.dml_test (id) values (7)" % db, 0, 1)