From 3db5ced4cee8967f28e337747efba902ce71cfee Mon Sep 17 00:00:00 2001 From: Dimitris Tsirogiannis Date: Wed, 9 Nov 2016 15:11:07 -0800 Subject: [PATCH] IMPALA-3726: Add support for Kudu-specific column options This commit adds support for Kudu-specific column options in CREATE TABLE statements. The syntax is: CREATE TABLE tbl_name ([col_name type [PRIMARY KEY] [option [...]]] [, ....]) where option is: | NULL | NOT NULL | ENCODING encoding_val | COMPRESSION compression_algorithm | DEFAULT expr | BLOCK_SIZE num The output of the SHOW CREATE TABLE statement was altered to include all the specified column options for Kudu tables. Change-Id: I727b9ae1b7b2387db752b58081398dd3f3449c02 Reviewed-on: http://gerrit.cloudera.org:8080/5026 Reviewed-by: Dimitris Tsirogiannis Tested-by: Internal Jenkins --- be/src/exec/kudu-table-sink.cc | 6 +- common/thrift/CatalogObjects.thrift | 20 +- fe/src/main/cup/sql-parser.cup | 240 +++++++++++++----- .../AlterTableAddReplaceColsStmt.java | 2 +- .../analysis/AlterTableChangeColStmt.java | 2 +- .../org/apache/impala/analysis/ColumnDef.java | 196 ++++++++++++-- .../analysis/CreateOrAlterViewStmtBase.java | 5 +- .../analysis/CreateTableAsSelectStmt.java | 9 +- .../analysis/CreateTableLikeFileStmt.java | 8 +- .../apache/impala/analysis/InsertStmt.java | 58 +++-- .../org/apache/impala/analysis/TableDef.java | 16 +- .../apache/impala/analysis/ToSqlUtils.java | 16 ++ .../org/apache/impala/catalog/Column.java | 10 +- .../org/apache/impala/catalog/KuduColumn.java | 86 ++++++- .../org/apache/impala/catalog/KuduTable.java | 13 +- .../java/org/apache/impala/catalog/Table.java | 17 +- .../org/apache/impala/planner/Planner.java | 2 + .../impala/service/KuduCatalogOpExecutor.java | 32 ++- .../apache/impala/util/AvroSchemaParser.java | 18 +- .../apache/impala/util/AvroSchemaUtils.java | 17 +- .../java/org/apache/impala/util/KuduUtil.java | 157 +++++++++++- fe/src/main/jflex/sql-scanner.flex | 4 + .../impala/analysis/AnalyzeDDLTest.java | 96 ++++++- .../apache/impala/analysis/ParserTest.java | 69 ++++- .../functional/functional_schema_template.sql | 43 ++-- .../queries/QueryTest/kudu_delete.test | 9 +- .../queries/QueryTest/kudu_insert.test | 4 +- .../queries/QueryTest/kudu_update.test | 7 +- .../queries/QueryTest/kudu_upsert.test | 11 +- tests/query_test/test_kudu.py | 52 ++-- tests/shell/test_shell_commandline.py | 2 +- 31 files changed, 973 insertions(+), 254 deletions(-) diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index 5dd63367f..a195e840c 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -195,17 +195,13 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { bool add_row = true; for (int j = 0; j < output_expr_ctxs_.size(); ++j) { - // For INSERT, output_expr_ctxs_ will contain all columns of the table in order. - // For UPDATE and UPSERT, output_expr_ctxs_ only contains the columns that the op + // output_expr_ctxs_ only contains the columns that the op // applies to, i.e. columns explicitly mentioned in the query, and // referenced_columns is then used to map to actual column positions. int col = kudu_table_sink_.referenced_columns.empty() ? j : kudu_table_sink_.referenced_columns[j]; void* value = output_expr_ctxs_[j]->GetValue(current_row); - - // If the value is NULL, we only need to explicitly set it for UPDATE and UPSERT. - // For INSERT, it can be ignored as unspecified cols will be implicitly set to NULL. if (value == NULL) { if (table_schema.Column(j).is_nullable()) { KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col), diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 9f0c6d48c..10cb7771d 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -72,7 +72,18 @@ enum THdfsCompression { SNAPPY, SNAPPY_BLOCKED, LZO, - LZ4 + LZ4, + ZLIB +} + +enum TColumnEncoding { + AUTO, + PLAIN, + PREFIX, + GROUP_VARINT, + RLE, + DICTIONARY, + BIT_SHUFFLE } enum THdfsSeqCompressionMode { @@ -191,11 +202,14 @@ struct TColumn { 8: optional string column_qualifier 9: optional bool is_binary - // Indicates whether this is a Kudu column. If true implies all following Kudu specific - // fields are set. + // All the following are Kudu-specific column properties 10: optional bool is_kudu_column 11: optional bool is_key 12: optional bool is_nullable + 13: optional TColumnEncoding encoding + 14: optional THdfsCompression compression + 15: optional Exprs.TExpr default_value + 16: optional i32 block_size } // Represents a block in an HDFS file diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup index 188403620..2fc765d87 100644 --- a/fe/src/main/cup/sql-parser.cup +++ b/fe/src/main/cup/sql-parser.cup @@ -18,15 +18,18 @@ package org.apache.impala.analysis; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java_cup.runtime.Symbol; import org.apache.impala.analysis.ColumnDef; +import org.apache.impala.analysis.ColumnDef.Option; import org.apache.impala.analysis.UnionStmt.Qualifier; import org.apache.impala.analysis.UnionStmt.UnionOperand; import org.apache.impala.analysis.RangePartition; @@ -240,16 +243,17 @@ parser code {: // ALL KEYWORDS ALSO NEED TO BE ADDED TO THE ident_or_kw PRODUCTION. terminal KW_ADD, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_ANALYTIC, KW_AND, KW_ANTI, KW_API_VERSION, - KW_ARRAY, KW_AS, KW_ASC, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY, KW_BOOLEAN, - KW_BUCKETS, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST, KW_CHANGE, KW_CHAR, - KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPUTE, KW_CREATE, - KW_CROSS, KW_CURRENT, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, - KW_DECIMAL, KW_DELETE, KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISTINCT, KW_DISTRIBUTE, - KW_DIV, KW_DOUBLE, KW_DROP, KW_ELSE, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN, - KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN, - KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, - KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF, - KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT, + KW_ARRAY, KW_AS, KW_ASC, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY, KW_BLOCKSIZE, + KW_BOOLEAN, KW_BUCKETS, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST, KW_CHANGE, + KW_CHAR, KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPRESSION, + KW_COMPUTE, KW_CREATE, KW_CROSS, KW_CURRENT, KW_DATA, KW_DATABASE, KW_DATABASES, + KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT, KW_DELETE, KW_DELIMITED, KW_DESC, + KW_DESCRIBE, KW_DISTINCT, KW_DISTRIBUTE, KW_DIV, KW_DOUBLE, KW_DROP, KW_ELSE, + KW_ENCODING, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN, KW_EXTENDED, KW_EXTERNAL, + KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN, KW_FIRST, KW_FLOAT, + KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION, + KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF, KW_ILIKE, + KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT, KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS, KW_JOIN, KW_KUDU, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP, KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORDER, @@ -280,6 +284,10 @@ terminal String STRING_LITERAL; terminal String UNMATCHED_STRING_LITERAL; terminal String UNEXPECTED_CHAR; +// IMPALA-3726 introduced the DEFAULT keyword which could break existing applications +// that use the identifier "KEYWORD" as database, column or table names. To avoid that, +// the ident_or_default non-terminal is introduced and should be used instead of IDENT. +nonterminal String ident_or_keyword, ident_or_default; nonterminal StatementBase stmt; // Single select statement. nonterminal SelectStmt select_stmt; @@ -399,7 +407,6 @@ nonterminal CreateDataSrcStmt create_data_src_stmt; nonterminal DropDataSrcStmt drop_data_src_stmt; nonterminal ShowDataSrcsStmt show_data_srcs_stmt; nonterminal StructField struct_field_def; -nonterminal String ident_or_keyword; nonterminal DistributeParam distribute_hash_param; nonterminal List range_params_list; nonterminal RangePartition range_param; @@ -415,7 +422,7 @@ nonterminal ArrayList struct_field_def_list; // Options for DDL commands - CREATE/DROP/ALTER nonterminal HdfsCachingOp cache_op_val; nonterminal BigDecimal opt_cache_op_replication; -nonterminal String comment_val; +nonterminal String comment_val, opt_comment_val; nonterminal Boolean external_val; nonterminal Boolean purge_val; nonterminal String opt_init_string_val; @@ -444,6 +451,13 @@ nonterminal String opt_kw_column; nonterminal String opt_kw_table; nonterminal Boolean overwrite_val; nonterminal Boolean cascade_val; +nonterminal Boolean nullability_val; +nonterminal String encoding_val; +nonterminal String compression_val; +nonterminal Expr default_val; +nonterminal LiteralExpr block_size_val; +nonterminal Pair column_option; +nonterminal Map column_options_map; // For GRANT/REVOKE/AUTH DDL statements nonterminal ShowRolesStmt show_roles_stmt; @@ -487,6 +501,7 @@ nonterminal TFunctionCategory opt_function_category; precedence left KW_OR; precedence left KW_AND; precedence right KW_NOT, NOT; +precedence left KW_DEFAULT; precedence left KW_BETWEEN, KW_IN, KW_IS, KW_EXISTS; precedence left KW_LIKE, KW_RLIKE, KW_ILIKE, KW_REGEXP, KW_IREGEXP; precedence left EQUAL, NOTEQUAL, LESSTHAN, GREATERTHAN, KW_FROM, KW_DISTINCT; @@ -789,31 +804,33 @@ opt_kw_table ::= show_roles_stmt ::= KW_SHOW KW_ROLES {: RESULT = new ShowRolesStmt(false, null); :} - | KW_SHOW KW_ROLE KW_GRANT KW_GROUP IDENT:group + | KW_SHOW KW_ROLE KW_GRANT KW_GROUP ident_or_default:group {: RESULT = new ShowRolesStmt(false, group); :} | KW_SHOW KW_CURRENT KW_ROLES {: RESULT = new ShowRolesStmt(true, null); :} ; show_grant_role_stmt ::= - KW_SHOW KW_GRANT KW_ROLE IDENT:role + KW_SHOW KW_GRANT KW_ROLE ident_or_default:role {: RESULT = new ShowGrantRoleStmt(role, null); :} - | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON server_ident:server_kw + | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON server_ident:server_kw {: RESULT = new ShowGrantRoleStmt(role, PrivilegeSpec.createServerScopedPriv(TPrivilegeLevel.ALL)); :} - | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON KW_DATABASE IDENT:db_name + | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON + KW_DATABASE ident_or_default:db_name {: RESULT = new ShowGrantRoleStmt(role, PrivilegeSpec.createDbScopedPriv(TPrivilegeLevel.ALL, db_name)); :} - | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON KW_TABLE table_name:tbl_name + | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON KW_TABLE table_name:tbl_name {: RESULT = new ShowGrantRoleStmt(role, PrivilegeSpec.createTableScopedPriv(TPrivilegeLevel.ALL, tbl_name)); :} - | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON uri_ident:uri_kw STRING_LITERAL:uri + | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON uri_ident:uri_kw + STRING_LITERAL:uri {: RESULT = new ShowGrantRoleStmt(role, PrivilegeSpec.createUriScopedPriv(TPrivilegeLevel.ALL, new HdfsUri(uri))); @@ -821,40 +838,40 @@ show_grant_role_stmt ::= ; create_drop_role_stmt ::= - KW_CREATE KW_ROLE IDENT:role + KW_CREATE KW_ROLE ident_or_default:role {: RESULT = new CreateDropRoleStmt(role, false); :} - | KW_DROP KW_ROLE IDENT:role + | KW_DROP KW_ROLE ident_or_default:role {: RESULT = new CreateDropRoleStmt(role, true); :} ; grant_role_stmt ::= - KW_GRANT KW_ROLE IDENT:role KW_TO KW_GROUP IDENT:group + KW_GRANT KW_ROLE ident_or_default:role KW_TO KW_GROUP ident_or_default:group {: RESULT = new GrantRevokeRoleStmt(role, group, true); :} ; revoke_role_stmt ::= - KW_REVOKE KW_ROLE IDENT:role KW_FROM KW_GROUP IDENT:group + KW_REVOKE KW_ROLE ident_or_default:role KW_FROM KW_GROUP ident_or_default:group {: RESULT = new GrantRevokeRoleStmt(role, group, false); :} ; grant_privilege_stmt ::= - KW_GRANT privilege_spec:priv KW_TO opt_kw_role:opt_role IDENT:role + KW_GRANT privilege_spec:priv KW_TO opt_kw_role:opt_role ident_or_default:role opt_with_grantopt:grant_opt {: RESULT = new GrantRevokePrivStmt(role, priv, true, grant_opt); :} ; revoke_privilege_stmt ::= KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM - opt_kw_role:opt_role IDENT:role + opt_kw_role:opt_role ident_or_default:role {: RESULT = new GrantRevokePrivStmt(role, priv, false, grant_opt); :} ; privilege_spec ::= privilege:priv KW_ON server_ident:server_kw {: RESULT = PrivilegeSpec.createServerScopedPriv(priv); :} - | privilege:priv KW_ON server_ident:server_kw IDENT:server_name + | privilege:priv KW_ON server_ident:server_kw ident_or_default:server_name {: RESULT = PrivilegeSpec.createServerScopedPriv(priv, server_name); :} - | privilege:priv KW_ON KW_DATABASE IDENT:db_name + | privilege:priv KW_ON KW_DATABASE ident_or_default:db_name {: RESULT = PrivilegeSpec.createDbScopedPriv(priv, db_name); :} | privilege:priv KW_ON KW_TABLE table_name:tbl_name {: RESULT = PrivilegeSpec.createTableScopedPriv(priv, tbl_name); :} @@ -902,9 +919,9 @@ alter_tbl_stmt ::= RESULT = new AlterTableAddPartitionStmt(table, partition, location, if_not_exists, cache_op); :} - | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column IDENT:col_name + | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column ident_or_default:col_name {: RESULT = new AlterTableDropColStmt(table, col_name); :} - | KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column IDENT:col_name + | KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column ident_or_default:col_name column_def:col_def {: RESULT = new AlterTableChangeColStmt(table, col_name, col_def); :} | KW_ALTER KW_TABLE table_name:table KW_DROP if_exists_val:if_exists @@ -927,7 +944,7 @@ alter_tbl_stmt ::= table_property_type:target LPAREN properties_map:properties RPAREN {: RESULT = new AlterTableSetTblProperties(table, partitions, target, properties); :} | KW_ALTER KW_TABLE table_name:table opt_partition_set:partition KW_SET - KW_COLUMN KW_STATS IDENT:col LPAREN properties_map:map RPAREN + KW_COLUMN KW_STATS ident_or_default:col LPAREN properties_map:map RPAREN {: // The opt_partition_set is used to avoid conflicts even though // a partition clause does not make sense for this stmt. If a partition @@ -966,8 +983,8 @@ replace_existing_cols_val ::= ; create_db_stmt ::= - KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists IDENT:db_name - comment_val:comment location_val:location + KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists ident_or_default:db_name + opt_comment_val:comment location_val:location {: RESULT = new CreateDbStmt(db_name, comment, location, if_not_exists); :} ; @@ -1032,9 +1049,9 @@ create_tbl_stmt ::= RESULT = new CreateTableStmt(tbl_def); :} | tbl_def_with_col_defs:tbl_def - KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id IDENT:data_src_name + KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id ident_or_default:data_src_name opt_init_string_val:init_string - comment_val:comment + opt_comment_val:comment {: // Need external_val in the grammar to avoid shift/reduce conflict with other // CREATE TABLE statements. @@ -1067,7 +1084,7 @@ create_tbl_stmt ::= create_tbl_like_stmt ::= tbl_def_without_col_defs:tbl_def KW_LIKE table_name:other_table - comment_val:comment + opt_comment_val:comment file_format_create_table_val:file_format location_val:location {: RESULT = new CreateTableLikeStmt(tbl_def.getTblName(), other_table, @@ -1089,7 +1106,8 @@ tbl_def_with_col_defs ::= tbl_def.getColumnDefs().addAll(list); RESULT = tbl_def; :} - | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA primary_keys:primary_keys RPAREN + | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA + primary_keys:primary_keys RPAREN {: tbl_def.getColumnDefs().addAll(list); tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys); @@ -1103,7 +1121,7 @@ primary_keys ::= ; tbl_options ::= - comment_val:comment row_format_val:row_format serde_properties:serde_props + opt_comment_val:comment row_format_val:row_format serde_properties:serde_props file_format_create_table_val:file_format location_val:location cache_op_val:cache_op tbl_properties:tbl_props {: @@ -1280,6 +1298,11 @@ opt_cache_op_replication ::= ; comment_val ::= + KW_COMMENT STRING_LITERAL:comment + {: RESULT = comment; :} + ; + +opt_comment_val ::= KW_COMMENT STRING_LITERAL:comment {: RESULT = comment; :} | /* empty */ @@ -1422,20 +1445,81 @@ column_def_list ::= ; column_def ::= - IDENT:col_name type_def:type is_primary_key_val:primary_key comment_val:comment - {: RESULT = new ColumnDef(col_name, type, primary_key, comment); :} + ident_or_default:col_name type_def:type column_options_map:options + {: RESULT = new ColumnDef(col_name, type, options); :} + | ident_or_default:col_name type_def:type + {: RESULT = new ColumnDef(col_name, type); :} + ; + +column_options_map ::= + column_options_map:map column_option:col_option + {: + if (map.put(col_option.first, col_option.second) != null) { + throw new Exception(String.format("Column option %s is specified multiple times", + col_option.first.toString())); + } + RESULT = map; + :} + | column_option:col_option + {: + Map options = Maps.newHashMap(); + options.put(col_option.first, col_option.second); + RESULT = options; + :} + ; + +column_option ::= + is_primary_key_val:primary_key + {: RESULT = new Pair(Option.IS_PRIMARY_KEY, primary_key); :} + | nullability_val:nullability + {: RESULT = new Pair(Option.IS_NULLABLE, nullability); :} + | encoding_val:encoding + {: RESULT = new Pair(Option.ENCODING, encoding); :} + | compression_val:compression + {: RESULT = new Pair(Option.COMPRESSION, compression); :} + | default_val:default_val + {: RESULT = new Pair(Option.DEFAULT, default_val); :} + | block_size_val:block_size + {: RESULT = new Pair(Option.BLOCK_SIZE, block_size); :} + | comment_val:comment + {: RESULT = new Pair(Option.COMMENT, comment); :} ; is_primary_key_val ::= KW_PRIMARY key_ident {: RESULT = true; :} - | /* empty */ + ; + +nullability_val ::= + KW_NOT KW_NULL {: RESULT = false; :} + | KW_NULL + {: RESULT = true; :} + ; + +encoding_val ::= + KW_ENCODING ident_or_default:encoding_ident + {: RESULT = encoding_ident; :} + ; + +compression_val ::= + KW_COMPRESSION ident_or_default:compression_ident + {: RESULT = compression_ident; :} + ; + +default_val ::= + KW_DEFAULT expr:default_val + {: RESULT = default_val; :} + ; + +block_size_val ::= + KW_BLOCKSIZE literal:block_size + {: RESULT = block_size; :} ; create_view_stmt ::= KW_CREATE KW_VIEW if_not_exists_val:if_not_exists table_name:view_name - view_column_defs:col_defs comment_val:comment KW_AS query_stmt:view_def + view_column_defs:col_defs opt_comment_val:comment KW_AS query_stmt:view_def {: RESULT = new CreateViewStmt(if_not_exists, view_name, col_defs, comment, view_def); :} @@ -1443,7 +1527,7 @@ create_view_stmt ::= create_data_src_stmt ::= KW_CREATE KW_DATA source_ident:is_source_id - if_not_exists_val:if_not_exists IDENT:data_src_name + if_not_exists_val:if_not_exists ident_or_default:data_src_name KW_LOCATION STRING_LITERAL:location KW_CLASS STRING_LITERAL:class_name KW_API_VERSION STRING_LITERAL:api_version @@ -1534,8 +1618,12 @@ view_column_def_list ::= ; view_column_def ::= - IDENT:col_name comment_val:comment - {: RESULT = new ColumnDef(col_name, null, comment); :} + ident_or_default:col_name opt_comment_val:comment + {: + Map options = Maps.newHashMap(); + if (comment != null) options.put(Option.COMMENT, comment); + RESULT = new ColumnDef(col_name, null, options); + :} ; alter_view_stmt ::= @@ -1571,7 +1659,8 @@ drop_stats_stmt ::= ; drop_db_stmt ::= - KW_DROP db_or_schema_kw if_exists_val:if_exists IDENT:db_name cascade_val:cascade + KW_DROP db_or_schema_kw if_exists_val:if_exists ident_or_default:db_name + cascade_val:cascade {: RESULT = new DropDbStmt(db_name, if_exists, cascade); :} ; @@ -1593,7 +1682,8 @@ drop_function_stmt ::= ; drop_data_src_stmt ::= - KW_DROP KW_DATA source_ident:is_source_id if_exists_val:if_exists IDENT:data_src_name + KW_DROP KW_DATA source_ident:is_source_id if_exists_val:if_exists + ident_or_default:data_src_name {: RESULT = new DropDataSrcStmt(data_src_name, if_exists); :} ; @@ -1683,7 +1773,7 @@ static_partition_key_value_list ::= partition_key_value ::= // Dynamic partition key values. - IDENT:column + ident_or_default:column {: RESULT = new PartitionKeyValue(column, null); :} | static_partition_key_value:partition {: RESULT = partition; :} @@ -1691,7 +1781,7 @@ partition_key_value ::= static_partition_key_value ::= // Static partition key values. - IDENT:column EQUAL expr:e + ident_or_default:column EQUAL expr:e {: RESULT = new PartitionKeyValue(column, e); :} ; @@ -1825,11 +1915,12 @@ opt_with_clause ::= ; with_view_def ::= - IDENT:alias KW_AS LPAREN query_stmt:query RPAREN + ident_or_default:alias KW_AS LPAREN query_stmt:query RPAREN {: RESULT = new View(alias, query, null); :} | STRING_LITERAL:alias KW_AS LPAREN query_stmt:query RPAREN {: RESULT = new View(alias, query, null); :} - | IDENT:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN query_stmt:query RPAREN + | ident_or_default:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN + query_stmt:query RPAREN {: RESULT = new View(alias, query, col_names); :} | STRING_LITERAL:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN query_stmt:query RPAREN @@ -1957,7 +2048,7 @@ values_operand_list ::= ; use_stmt ::= - KW_USE IDENT:db + KW_USE ident_or_default:db {: RESULT = new UseStmt(db); :} ; @@ -1966,9 +2057,9 @@ show_tables_stmt ::= {: RESULT = new ShowTablesStmt(); :} | KW_SHOW KW_TABLES show_pattern:showPattern {: RESULT = new ShowTablesStmt(showPattern); :} - | KW_SHOW KW_TABLES KW_IN IDENT:db + | KW_SHOW KW_TABLES KW_IN ident_or_default:db {: RESULT = new ShowTablesStmt(db, null); :} - | KW_SHOW KW_TABLES KW_IN IDENT:db show_pattern:showPattern + | KW_SHOW KW_TABLES KW_IN ident_or_default:db show_pattern:showPattern {: RESULT = new ShowTablesStmt(db, showPattern); :} ; @@ -1996,9 +2087,9 @@ show_functions_stmt ::= {: RESULT = new ShowFunctionsStmt(null, null, fn_type); :} | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS show_pattern:showPattern {: RESULT = new ShowFunctionsStmt(null, showPattern, fn_type); :} - | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN IDENT:db + | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN ident_or_default:db {: RESULT = new ShowFunctionsStmt(db, null, fn_type); :} - | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN IDENT:db + | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN ident_or_default:db show_pattern:showPattern {: RESULT = new ShowFunctionsStmt(db, showPattern, fn_type); :} ; @@ -2051,7 +2142,7 @@ show_files_stmt ::= ; describe_db_stmt ::= - KW_DESCRIBE db_or_schema_kw describe_output_style:style IDENT:db + KW_DESCRIBE db_or_schema_kw describe_output_style:style ident_or_default:db {: RESULT = new DescribeDbStmt(db, style); :} ; @@ -2108,9 +2199,9 @@ select_clause ::= ; set_stmt ::= - KW_SET IDENT:key EQUAL literal:l + KW_SET ident_or_default:key EQUAL literal:l {: RESULT = new SetStmt(key, l.getStringValue()); :} - | KW_SET IDENT:key EQUAL IDENT:ident + | KW_SET ident_or_default:key EQUAL ident_or_default:ident {: RESULT = new SetStmt(key, ident); :} | KW_SET {: RESULT = new SetStmt(null, null); :} @@ -2140,9 +2231,9 @@ select_list_item ::= ; alias_clause ::= - KW_AS IDENT:ident + KW_AS ident_or_default:ident {: RESULT = ident; :} - | IDENT:ident + | ident_or_default:ident {: RESULT = ident; :} | KW_AS STRING_LITERAL:l {: RESULT = l; :} @@ -2158,9 +2249,9 @@ star_expr ::= ; table_name ::= - IDENT:tbl + ident_or_default:tbl {: RESULT = new TableName(null, tbl); :} - | IDENT:db DOT IDENT:tbl + | ident_or_default:db DOT ident_or_default:tbl {: RESULT = new TableName(db, tbl); :} ; @@ -2295,13 +2386,13 @@ opt_plan_hints ::= ; ident_list ::= - IDENT:ident + ident_or_default:ident {: ArrayList list = new ArrayList(); list.add(ident); RESULT = list; :} - | ident_list:list COMMA IDENT:ident + | ident_list:list COMMA ident_or_default:ident {: list.add(ident); RESULT = list; @@ -2525,7 +2616,7 @@ function_call_expr ::= | function_name:fn_name LPAREN function_params:params RPAREN {: RESULT = FunctionCallExpr.createExpr(fn_name, params); :} // Below is a special case for EXTRACT. Idents are used to avoid adding new keywords. - | function_name:fn_name LPAREN IDENT:u KW_FROM expr:t RPAREN + | function_name:fn_name LPAREN ident_or_default:u KW_FROM expr:t RPAREN {: RESULT = new ExtractFromExpr(fn_name, u, t); :} ; @@ -2828,13 +2919,13 @@ slot_ref ::= ; dotted_path ::= - IDENT:ident + ident_or_default:ident {: ArrayList list = new ArrayList(); list.add(ident); RESULT = list; :} - | dotted_path:list DOT IDENT:ident + | dotted_path:list DOT ident_or_default:ident {: list.add(ident); RESULT = list; @@ -2895,7 +2986,7 @@ type ::= // that we can parse type strings from the Hive Metastore which // may have unquoted identifiers corresponding to keywords. struct_field_def ::= - ident_or_keyword:name COLON type:t comment_val:comment + ident_or_keyword:name COLON type:t opt_comment_val:comment {: RESULT = new StructField(name, t, comment); :} ; @@ -2913,6 +3004,13 @@ struct_field_def_list ::= :} ; +ident_or_default ::= + IDENT:name + {: RESULT = name.toString(); :} + | KW_DEFAULT:name + {: RESULT = name.toString(); :} + ; + ident_or_keyword ::= IDENT:r {: RESULT = r.toString(); :} @@ -2946,6 +3044,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_BINARY:r {: RESULT = r.toString(); :} + | KW_BLOCKSIZE:r + {: RESULT = r.toString(); :} | KW_BOOLEAN:r {: RESULT = r.toString(); :} | KW_BUCKETS:r @@ -2974,6 +3074,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_COMMENT:r {: RESULT = r.toString(); :} + | KW_COMPRESSION:r + {: RESULT = r.toString(); :} | KW_COMPUTE:r {: RESULT = r.toString(); :} | KW_CREATE:r @@ -2994,6 +3096,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_DECIMAL:r {: RESULT = r.toString(); :} + | KW_DEFAULT:r + {: RESULT = r.toString(); :} | KW_DELETE:r {: RESULT = r.toString(); :} | KW_DELIMITED:r @@ -3014,6 +3118,8 @@ ident_or_keyword ::= {: RESULT = r.toString(); :} | KW_ELSE:r {: RESULT = r.toString(); :} + | KW_ENCODING:r + {: RESULT = r.toString(); :} | KW_END:r {: RESULT = r.toString(); :} | KW_ESCAPED:r diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java index 0354117c1..feda13834 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java @@ -90,7 +90,7 @@ public class AlterTableAddReplaceColsStmt extends AlterTableStmt { // partition columns. Set colNames = Sets.newHashSet(); for (ColumnDef c: columnDefs_) { - c.analyze(); + c.analyze(analyzer); String colName = c.getColName().toLowerCase(); if (existingPartitionKeys.contains(colName)) { throw new AnalysisException( diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java index 913074042..5c4bfeeab 100644 --- a/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java @@ -90,7 +90,7 @@ public class AlterTableChangeColStmt extends AlterTableStmt { } // Check that the new column def's name is valid. - newColDef_.analyze(); + newColDef_.analyze(analyzer); // Verify that if the column name is being changed, the new name doesn't conflict // with an existing column. if (!colName_.toLowerCase().equals(newColDef_.getColName().toLowerCase()) && diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java index 923a0a62e..f65aa277f 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java +++ b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java @@ -18,21 +18,26 @@ package org.apache.impala.analysis; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.LinkedHashMap; import java.util.Map; import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; + import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; - import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; import org.apache.impala.thrift.TColumn; +import org.apache.impala.util.KuduUtil; import org.apache.impala.util.MetaStoreUtil; +import org.apache.kudu.ColumnSchema.CompressionAlgorithm; +import org.apache.kudu.ColumnSchema.Encoding; /** * Represents a column definition in a CREATE/ALTER TABLE/VIEW statement. @@ -40,31 +45,89 @@ import org.apache.impala.util.MetaStoreUtil; * whereas column definitions in CREATE/ALTER VIEW statements infer the column type from * the corresponding view definition. All column definitions have an optional comment. * Since a column definition refers a column stored in the Metastore, the column name - * must be valid according to the Metastore's rules (see @MetaStoreUtils). + * must be valid according to the Metastore's rules (see @MetaStoreUtils). A number of + * additional column options may be specified for Kudu tables. */ public class ColumnDef { private final String colName_; - private String comment_; - // Required in CREATE/ALTER TABLE stmts. Set to NULL in CREATE/ALTER VIEW stmts, // for which we setType() after analyzing the defining view definition stmt. private final TypeDef typeDef_; private Type type_; + private String comment_; - // Set to true if the user specified "PRIMARY KEY" in the column definition. Kudu table - // definitions may use this. - private boolean isPrimaryKey_; - - public ColumnDef(String colName, TypeDef typeDef, String comment) { - this(colName, typeDef, false, comment); + // Available column options + public enum Option { + IS_PRIMARY_KEY, + IS_NULLABLE, + ENCODING, + COMPRESSION, + DEFAULT, + BLOCK_SIZE, + COMMENT } - public ColumnDef(String colName, TypeDef typeDef, boolean isPrimaryKey, - String comment) { + // Kudu-specific column options + // + // Set to true if the user specified "PRIMARY KEY" in the column definition. + private boolean isPrimaryKey_; + // Set to true if this column may contain null values. Can be NULL if + // not specified. + private Boolean isNullable_; + private String encodingVal_; + // Encoding for this column; set in analysis. + private Encoding encoding_; + private String compressionVal_; + // Compression algorithm for this column; set in analysis. + private CompressionAlgorithm compression_; + // Default value for this column. + private Expr defaultValue_; + // Desired block size for this column. + private LiteralExpr blockSize_; + + public ColumnDef(String colName, TypeDef typeDef, Map options) { + Preconditions.checkNotNull(options); colName_ = colName.toLowerCase(); typeDef_ = typeDef; - isPrimaryKey_ = isPrimaryKey; - comment_ = comment; + for (Map.Entry option: options.entrySet()) { + switch (option.getKey()) { + case IS_PRIMARY_KEY: + Preconditions.checkState(option.getValue() instanceof Boolean); + isPrimaryKey_ = (Boolean) option.getValue(); + break; + case IS_NULLABLE: + Preconditions.checkState(option.getValue() instanceof Boolean); + isNullable_ = (Boolean) option.getValue(); + break; + case ENCODING: + Preconditions.checkState(option.getValue() instanceof String); + encodingVal_ = ((String) option.getValue()).toUpperCase(); + break; + case COMPRESSION: + Preconditions.checkState(option.getValue() instanceof String); + compressionVal_ = ((String) option.getValue()).toUpperCase(); + break; + case DEFAULT: + Preconditions.checkState(option.getValue() instanceof Expr); + defaultValue_ = (Expr) option.getValue(); + break; + case BLOCK_SIZE: + Preconditions.checkState(option.getValue() instanceof LiteralExpr); + blockSize_ = (LiteralExpr) option.getValue(); + break; + case COMMENT: + Preconditions.checkState(option.getValue() instanceof String); + comment_ = (String) option.getValue(); + break; + default: + throw new IllegalStateException(String.format("Illegal option %s", + option.getKey())); + } + } + } + + public ColumnDef(String colName, TypeDef typeDef) { + this(colName, typeDef, Collections.emptyMap()); } /** @@ -81,8 +144,7 @@ public class ColumnDef { colName_ = fs.getName(); typeDef_ = new TypeDef(type); comment_ = fs.getComment(); - isPrimaryKey_ = false; - analyze(); + analyze(null); } public String getColName() { return colName_; } @@ -92,8 +154,13 @@ public class ColumnDef { boolean isPrimaryKey() { return isPrimaryKey_; } public void setComment(String comment) { comment_ = comment; } public String getComment() { return comment_; } + public boolean hasKuduOptions() { + return isPrimaryKey_ || isNullable_ != null || encodingVal_ != null + || compressionVal_ != null || defaultValue_ != null || blockSize_ != null; + } + public boolean isNullable() { return isNullable_ != null && isNullable_; } - public void analyze() throws AnalysisException { + public void analyze(Analyzer analyzer) throws AnalysisException { // Check whether the column name meets the Metastore's requirements. if (!MetaStoreUtils.validateName(colName_)) { throw new AnalysisException("Invalid column/field name: " + colName_); @@ -112,6 +179,10 @@ public class ColumnDef { "%s has %d characters.", colName_, MetaStoreUtil.MAX_TYPE_NAME_LENGTH, typeSql, typeSql.length())); } + if (hasKuduOptions()) { + Preconditions.checkNotNull(analyzer); + analyzeKuduOptions(analyzer); + } if (comment_ != null && comment_.length() > MetaStoreUtil.CREATE_MAX_COMMENT_LENGTH) { throw new AnalysisException(String.format( @@ -121,6 +192,79 @@ public class ColumnDef { } } + private void analyzeKuduOptions(Analyzer analyzer) throws AnalysisException { + if (isPrimaryKey_ && isNullable_ != null && isNullable_) { + throw new AnalysisException("Primary key columns cannot be nullable: " + + toString()); + } + // Encoding value + if (encodingVal_ != null) { + try { + encoding_ = Encoding.valueOf(encodingVal_); + } catch (IllegalArgumentException e) { + throw new AnalysisException(String.format("Unsupported encoding value '%s'. " + + "Supported encoding values are: %s", encodingVal_, + Joiner.on(", ").join(Encoding.values()))); + } + } + // Compression algorithm + if (compressionVal_ != null) { + try { + compression_ = CompressionAlgorithm.valueOf(compressionVal_); + } catch (IllegalArgumentException e) { + throw new AnalysisException(String.format("Unsupported compression " + + "algorithm '%s'. Supported compression algorithms are: %s", compressionVal_, + Joiner.on(", ").join(CompressionAlgorithm.values()))); + } + } + // Analyze the default value, if any. + // TODO: Similar checks are applied for range partition values in + // RangePartition.analyzeBoundaryValue(). Consider consolidating the logic into a + // single function. + if (defaultValue_ != null) { + try { + defaultValue_.analyze(analyzer); + } catch (AnalysisException e) { + throw new AnalysisException(String.format("Only constant values are allowed " + + "for default values: %s", defaultValue_.toSql()), e); + } + if (!defaultValue_.isConstant()) { + throw new AnalysisException(String.format("Only constant values are allowed " + + "for default values: %s", defaultValue_.toSql())); + } + defaultValue_ = LiteralExpr.create(defaultValue_, analyzer.getQueryCtx()); + if (defaultValue_ == null) { + throw new AnalysisException(String.format("Only constant values are allowed " + + "for default values: %s", defaultValue_.toSql())); + } + if (defaultValue_.getType().isNull() && ((isNullable_ != null && !isNullable_) + || isPrimaryKey_)) { + throw new AnalysisException(String.format("Default value of NULL not allowed " + + "on non-nullable column: '%s'", getColName())); + } + if (!Type.isImplicitlyCastable(defaultValue_.getType(), type_, true)) { + throw new AnalysisException(String.format("Default value %s (type: %s) " + + "is not compatible with column '%s' (type: %s).", defaultValue_.toSql(), + defaultValue_.getType().toSql(), colName_, type_.toSql())); + } + if (!defaultValue_.getType().equals(type_)) { + Expr castLiteral = defaultValue_.uncheckedCastTo(type_); + Preconditions.checkNotNull(castLiteral); + defaultValue_ = LiteralExpr.create(castLiteral, analyzer.getQueryCtx()); + } + Preconditions.checkNotNull(defaultValue_); + } + + // Analyze the block size value, if any. + if (blockSize_ != null) { + blockSize_.analyze(null); + if (!blockSize_.getType().isIntegerType()) { + throw new AnalysisException(String.format("Invalid value for BLOCK_SIZE: %s. " + + "A positive INTEGER value is expected.", blockSize_.toSql())); + } + } + } + @Override public String toString() { StringBuilder sb = new StringBuilder(colName_).append(" "); @@ -130,6 +274,11 @@ public class ColumnDef { sb.append(typeDef_); } if (isPrimaryKey_) sb.append(" PRIMARY KEY"); + if (isNullable_ != null) sb.append(isNullable_ ? " NULL" : " NOT NULL"); + if (encoding_ != null) sb.append(" ENCODING " + encoding_.toString()); + if (compression_ != null) sb.append(" COMPRESSION " + compression_.toString()); + if (defaultValue_ != null) sb.append(" DEFAULT_VALUE " + defaultValue_.toSql()); + if (blockSize_ != null) sb.append(" BLOCK_SIZE " + blockSize_.toSql()); if (comment_ != null) sb.append(String.format(" COMMENT '%s'", comment_)); return sb.toString(); } @@ -146,12 +295,21 @@ public class ColumnDef { .append(isPrimaryKey_, rhs.isPrimaryKey_) .append(typeDef_, rhs.typeDef_) .append(type_, rhs.type_) + .append(isNullable_, rhs.isNullable_) + .append(encoding_, rhs.encoding_) + .append(compression_, rhs.compression_) + .append(defaultValue_, rhs.defaultValue_) + .append(blockSize_, rhs.blockSize_) .isEquals(); } public TColumn toThrift() { - TColumn col = new TColumn(new TColumn(getColName(), type_.toThrift())); - col.setComment(getComment()); + TColumn col = new TColumn(getColName(), type_.toThrift()); + Integer blockSize = + blockSize_ == null ? null : (int) ((NumericLiteral) blockSize_).getIntValue(); + KuduUtil.setColumnOptions(col, isPrimaryKey_, isNullable_, encoding_, + compression_, defaultValue_, blockSize); + if (comment_ != null) col.setComment(comment_); return col; } diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java b/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java index 54b855791..5f524f5d2 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java @@ -18,6 +18,7 @@ package org.apache.impala.analysis; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; @@ -114,7 +115,7 @@ public abstract class CreateOrAlterViewStmtBase extends StatementBase { List labels = viewDefStmt_.getColLabels(); Preconditions.checkState(exprs.size() == labels.size()); for (int i = 0; i < viewDefStmt_.getColLabels().size(); ++i) { - ColumnDef colDef = new ColumnDef(labels.get(i), null, null); + ColumnDef colDef = new ColumnDef(labels.get(i), null); colDef.setType(exprs.get(i).getType()); finalColDefs_.add(colDef); } @@ -124,7 +125,7 @@ public abstract class CreateOrAlterViewStmtBase extends StatementBase { // duplicate column names. Set distinctColNames = Sets.newHashSet(); for (ColumnDef colDesc: finalColDefs_) { - colDesc.analyze(); + colDesc.analyze(null); if (!distinctColNames.add(colDesc.getColName().toLowerCase())) { throw new AnalysisException("Duplicate column name: " + colDesc.getColName()); } diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java index 1e53d1e80..5dca6b56d 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java @@ -17,8 +17,9 @@ package org.apache.impala.analysis; -import java.util.EnumSet; +import java.util.Collections; import java.util.List; +import java.util.EnumSet; import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.Db; @@ -147,7 +148,7 @@ public class CreateTableAsSelectStmt extends StatementBase { "mismatch: %s != %s", partitionLabel, colLabel)); } - ColumnDef colDef = new ColumnDef(colLabel, null, null); + ColumnDef colDef = new ColumnDef(colLabel, null); colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType()); createStmt_.getPartitionColumnDefs().add(colDef); } @@ -159,8 +160,8 @@ public class CreateTableAsSelectStmt extends StatementBase { int colCnt = tmpQueryStmt.getColLabels().size(); createStmt_.getColumnDefs().clear(); for (int i = 0; i < colCnt; ++i) { - ColumnDef colDef = new ColumnDef( - tmpQueryStmt.getColLabels().get(i), null, null); + ColumnDef colDef = new ColumnDef(tmpQueryStmt.getColLabels().get(i), null, + Collections.emptyMap()); colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType()); createStmt_.getColumnDefs().add(colDef); } diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java index a653323f1..432e9c141 100644 --- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java @@ -21,9 +21,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; @@ -36,7 +38,6 @@ import org.apache.impala.authorization.Privilege; import org.apache.impala.catalog.ArrayType; import org.apache.impala.catalog.HdfsCompression; import org.apache.impala.catalog.HdfsFileFormat; -import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.MapType; import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.StructField; @@ -328,8 +329,9 @@ public class CreateTableLikeFileStmt extends CreateTableStmt { Type type = convertParquetType(field); Preconditions.checkNotNull(type); String colName = field.getName(); - schema.add(new ColumnDef(colName, new TypeDef(type), - "Inferred from Parquet file.")); + Map option = Maps.newHashMap(); + option.put(ColumnDef.Option.COMMENT, "Inferred from Parquet file."); + schema.add(new ColumnDef(colName, new TypeDef(type), option)); } return schema; } diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java index 55005f94a..87f7cefb5 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java @@ -27,6 +27,7 @@ import org.apache.impala.authorization.PrivilegeRequestBuilder; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.HBaseTable; import org.apache.impala.catalog.HdfsTable; +import org.apache.impala.catalog.KuduColumn; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.Table; import org.apache.impala.catalog.Type; @@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -126,18 +126,20 @@ public class InsertStmt extends StatementBase { // Output expressions that produce the final results to write to the target table. May // include casts. Set in prepareExpressions(). - // If this is an INSERT, will contain one Expr for all non-partition columns of the - // target table with NullLiterals where an output column isn't explicitly mentioned. - // The i'th expr produces the i'th column of the target table. - // If this is an UPSERT, will contain one Expr per column mentioned in the query and - // mentionedUpsertColumns_ is used to map between the Exprs and columns in the target - // table. + // If this is an INSERT on a non-Kudu table, it will contain one Expr for all + // non-partition columns of the target table with NullLiterals where an output + // column isn't explicitly mentioned. The i'th expr produces the i'th column of + // the target table. + // + // For Kudu tables (INSERT and UPSERT operations), it will contain one Expr per column + // mentioned in the query and mentionedColumns_ is used to map between the Exprs + // and columns in the target table. private ArrayList resultExprs_ = Lists.newArrayList(); // Position mapping of exprs in resultExprs_ to columns in the target table - - // resultExprs_[i] produces the mentionedUpsertColumns_[i] column of the target table. - // Only used for UPSERT, set in prepareExpressions(). - private final List mentionedUpsertColumns_ = Lists.newArrayList(); + // resultExprs_[i] produces the mentionedColumns_[i] column of the target table. + // Only used for Kudu tables, set in prepareExpressions(). + private final List mentionedColumns_ = Lists.newArrayList(); // Set in analyze(). Exprs corresponding to key columns of Kudu tables. Empty for // non-Kudu tables. @@ -209,7 +211,7 @@ public class InsertStmt extends StatementBase { hasNoShuffleHint_ = false; hasClusteredHint_ = false; resultExprs_.clear(); - mentionedUpsertColumns_.clear(); + mentionedColumns_.clear(); primaryKeyExprs_.clear(); } @@ -277,8 +279,9 @@ public class InsertStmt extends StatementBase { // Finally, prepareExpressions analyzes the expressions themselves, and confirms that // they are type-compatible with the target columns. Where columns are not mentioned // (and by this point, we know that missing columns are not partition columns), - // prepareExpressions assigns them a NULL literal expressions, unless this is an - // UPSERT, in which case we don't want to overwrite unmentioned columns with NULL. + // prepareExpressions assigns them a NULL literal expressions, unless the target is + // a Kudu table, in which case we don't want to overwrite unmentioned columns with + // NULL. // An null permutation clause is the same as listing all non-partition columns in // order. @@ -602,7 +605,7 @@ public class InsertStmt extends StatementBase { * * 3. Populates resultExprs_ with type-compatible expressions, in Hive column order, * for all expressions in the select-list. Unmentioned columns are assigned NULL literal - * expressions, unless this is an UPSERT. + * expressions, unless the target is a Kudu table. * * 4. Result exprs for key columns of Kudu tables are stored in primaryKeyExprs_. * @@ -667,6 +670,7 @@ public class InsertStmt extends StatementBase { expr.analyze(analyzer); } + boolean isKuduTable = table_ instanceof KuduTable; // Finally, 'undo' the permutation so that the selectListExprs are in Hive column // order, and add NULL expressions to all missing columns, unless this is an UPSERT. ArrayList columns = table_.getColumnsInHiveOrder(); @@ -676,23 +680,32 @@ public class InsertStmt extends StatementBase { for (int i = 0; i < selectListExprs.size(); ++i) { if (selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) { resultExprs_.add(selectListExprs.get(i)); - if (isUpsert_) mentionedUpsertColumns_.add(col); + if (isKuduTable) mentionedColumns_.add(col); matchFound = true; break; } } // If no match is found, either the column is a clustering column with a static // value, or it was unmentioned and therefore should have a NULL select-list - // expression if this is an INSERT. + // expression if this is an INSERT and the target is not a Kudu table. if (!matchFound) { - if (tblColumn.getPosition() >= numClusteringCols && !isUpsert_) { - // Unmentioned non-clustering columns get NULL literals with the appropriate - // target type because Parquet cannot handle NULL_TYPE (IMPALA-617). - resultExprs_.add(NullLiteral.create(tblColumn.getType())); + if (tblColumn.getPosition() >= numClusteringCols) { + if (isKuduTable) { + Preconditions.checkState(tblColumn instanceof KuduColumn); + KuduColumn kuduCol = (KuduColumn) tblColumn; + if (!kuduCol.hasDefaultValue() && !kuduCol.isNullable()) { + throw new AnalysisException("Missing values for column that is not " + + "nullable and has no default value " + kuduCol.getName()); + } + } else { + // Unmentioned non-clustering columns get NULL literals with the appropriate + // target type because Parquet cannot handle NULL_TYPE (IMPALA-617). + resultExprs_.add(NullLiteral.create(tblColumn.getType())); + } } } // Store exprs for Kudu key columns. - if (matchFound && table_ instanceof KuduTable) { + if (matchFound && isKuduTable) { KuduTable kuduTable = (KuduTable) table_; if (kuduTable.isPrimaryKeyColumn(tblColumn.getName())) { primaryKeyExprs_.add(Iterables.getLast(resultExprs_)); @@ -779,9 +792,8 @@ public class InsertStmt extends StatementBase { public DataSink createDataSink() { // analyze() must have been called before. Preconditions.checkState(table_ != null); - Preconditions.checkState(isUpsert_ || mentionedUpsertColumns_.isEmpty()); return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT, - partitionKeyExprs_, mentionedUpsertColumns_, overwrite_); + partitionKeyExprs_, mentionedColumns_, overwrite_); } /** diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java index ce08e3633..1c16954ab 100644 --- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java +++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java @@ -174,7 +174,7 @@ class TableDef { Preconditions.checkState(tableName_ != null && !tableName_.isEmpty()); fqTableName_ = analyzer.getFqTableName(getTblName()); fqTableName_.analyze(); - analyzeColumnDefs(); + analyzeColumnDefs(analyzer); analyzePrimaryKeys(); if (analyzer.dbContainsTable(getTblName().getDb(), getTbl(), Privilege.CREATE) @@ -194,16 +194,20 @@ class TableDef { * Analyzes table and partition column definitions, checking whether all column * names are unique. */ - private void analyzeColumnDefs() throws AnalysisException { + private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException { Set colNames = Sets.newHashSet(); for (ColumnDef colDef: columnDefs_) { - colDef.analyze(); + colDef.analyze(analyzer); if (!colNames.add(colDef.getColName().toLowerCase())) { throw new AnalysisException("Duplicate column name: " + colDef.getColName()); } + if (getFileFormat() != THdfsFileFormat.KUDU && colDef.hasKuduOptions()) { + throw new AnalysisException(String.format("Unsupported column options for " + + "file format '%s': '%s'", getFileFormat().name(), colDef.toString())); + } } for (ColumnDef colDef: getPartitionColumnDefs()) { - colDef.analyze(); + colDef.analyze(analyzer); if (!colDef.getType().supportsTablePartitioning()) { throw new AnalysisException( String.format("Type '%s' is not supported as partition-column type " + @@ -247,6 +251,10 @@ class TableDef { throw new AnalysisException(String.format( "PRIMARY KEY column '%s' does not exist in the table", colName)); } + if (colDef.isNullable()) { + throw new AnalysisException("Primary key columns cannot be nullable: " + + colDef.toString()); + } primaryKeyColDefs_.add(colDef); } } diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java index aa24336c8..f01a78c45 100644 --- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java +++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java @@ -41,6 +41,7 @@ import org.apache.impala.catalog.Function; import org.apache.impala.catalog.HBaseTable; import org.apache.impala.catalog.HdfsCompression; import org.apache.impala.catalog.HdfsFileFormat; +import org.apache.impala.catalog.KuduColumn; import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.RowFormat; import org.apache.impala.catalog.Table; @@ -351,6 +352,21 @@ public class ToSqlUtils { private static String columnToSql(Column col) { StringBuilder sb = new StringBuilder(col.getName()); if (col.getType() != null) sb.append(" " + col.getType().toSql()); + if (col instanceof KuduColumn) { + KuduColumn kuduCol = (KuduColumn) col; + Boolean isNullable = kuduCol.isNullable(); + if (isNullable != null) sb.append(isNullable ? " NULL" : " NOT NULL"); + if (kuduCol.getEncoding() != null) sb.append(" ENCODING " + kuduCol.getEncoding()); + if (kuduCol.getCompression() != null) { + sb.append(" COMPRESSION " + kuduCol.getCompression()); + } + if (kuduCol.getDefaultValue() != null) { + sb.append(" DEFAULT " + kuduCol.getDefaultValue().toSql()); + } + if (kuduCol.getBlockSize() != 0) { + sb.append(String.format(" BLOCK_SIZE %d", kuduCol.getBlockSize())); + } + } if (!Strings.isNullOrEmpty(col.getComment())) { sb.append(String.format(" COMMENT '%s'", col.getComment())); } diff --git a/fe/src/main/java/org/apache/impala/catalog/Column.java b/fe/src/main/java/org/apache/impala/catalog/Column.java index 91928aa1b..0830f61e5 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Column.java +++ b/fe/src/main/java/org/apache/impala/catalog/Column.java @@ -31,6 +31,8 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import org.apache.impala.common.ImpalaRuntimeException; + /** * Internal representation of column-related metadata. * Owned by Catalog instance. @@ -84,7 +86,7 @@ public class Column { .add("position_", position_).toString(); } - public static Column fromThrift(TColumn columnDesc) { + public static Column fromThrift(TColumn columnDesc) throws ImpalaRuntimeException { String comment = columnDesc.isSetComment() ? columnDesc.getComment() : null; Preconditions.checkState(columnDesc.isSetPosition()); int position = columnDesc.getPosition(); @@ -98,11 +100,7 @@ public class Column { columnDesc.getColumn_qualifier(), columnDesc.isIs_binary(), Type.fromThrift(columnDesc.getColumnType()), comment, position); } else if (columnDesc.isIs_kudu_column()) { - Preconditions.checkState(columnDesc.isSetIs_key()); - Preconditions.checkState(columnDesc.isSetIs_nullable()); - col = new KuduColumn(columnDesc.getColumnName(), columnDesc.isIs_key(), - columnDesc.isIs_nullable(), - Type.fromThrift(columnDesc.getColumnType()), comment, position); + col = KuduColumn.fromThrift(columnDesc, position); } else { // Hdfs table column. col = new Column(columnDesc.getColumnName(), diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java index 404dbf5aa..5640748e0 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java @@ -17,36 +17,108 @@ package org.apache.impala.catalog; +import com.google.common.base.Preconditions; +import org.apache.impala.analysis.LiteralExpr; +import org.apache.impala.common.AnalysisException; +import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.thrift.TColumn; +import org.apache.impala.util.KuduUtil; + +import org.apache.kudu.ColumnSchema.CompressionAlgorithm; +import org.apache.kudu.ColumnSchema.Encoding; +import org.apache.kudu.ColumnSchema; /** - * Describes a Kudu column mapped to a Hive column (as described in the metastore). - * This class extends Column with Kudu-specific information about whether it is part of a primary - * key, and whether it is nullable. + * Represents a Kudu column. + * + * This class extends Column with Kudu-specific information: + * - primary key + * - nullability constraint + * - encoding + * - compression + * - default value + * - desired block size */ public class KuduColumn extends Column { private final boolean isKey_; private final boolean isNullable_; + private final Encoding encoding_; + private final CompressionAlgorithm compression_; + private final LiteralExpr defaultValue_; + private final int blockSize_; - public KuduColumn(String name, boolean isKey, boolean isNullable, Type type, - String comment, int position) { + private KuduColumn(String name, Type type, boolean isKey, boolean isNullable, + Encoding encoding, CompressionAlgorithm compression, LiteralExpr defaultValue, + int blockSize, String comment, int position) { super(name, type, comment, position); isKey_ = isKey; isNullable_ = isNullable; + encoding_ = encoding; + compression_ = compression; + defaultValue_ = defaultValue; + blockSize_ = blockSize; + } + + public static KuduColumn fromColumnSchema(ColumnSchema colSchema, int position) + throws ImpalaRuntimeException { + Type type = KuduUtil.toImpalaType(colSchema.getType()); + Object defaultValue = colSchema.getDefaultValue(); + LiteralExpr defaultValueExpr = null; + if (defaultValue != null) { + try { + defaultValueExpr = LiteralExpr.create(defaultValue.toString(), type); + } catch (AnalysisException e) { + throw new ImpalaRuntimeException(String.format("Error parsing default value: " + + "'%s'", defaultValue), e); + } + Preconditions.checkNotNull(defaultValueExpr); + } + return new KuduColumn(colSchema.getName(), type, colSchema.isKey(), + colSchema.isNullable(), colSchema.getEncoding(), + colSchema.getCompressionAlgorithm(), defaultValueExpr, + colSchema.getDesiredBlockSize(), null, position); + } + + public static KuduColumn fromThrift(TColumn column, int position) + throws ImpalaRuntimeException { + Preconditions.checkState(column.isSetIs_key()); + Preconditions.checkState(column.isSetIs_nullable()); + Type columnType = Type.fromThrift(column.getColumnType()); + Encoding encoding = null; + if (column.isSetEncoding()) encoding = KuduUtil.fromThrift(column.getEncoding()); + CompressionAlgorithm compression = null; + if (column.isSetCompression()) { + compression = KuduUtil.fromThrift(column.getCompression()); + } + LiteralExpr defaultValue = null; + if (column.isSetDefault_value()) { + defaultValue = + LiteralExpr.fromThrift(column.getDefault_value().getNodes().get(0), columnType); + } + int blockSize = 0; + if (column.isSetBlock_size()) blockSize = column.getBlock_size(); + return new KuduColumn(column.getColumnName(), columnType, column.isIs_key(), + column.isIs_nullable(), encoding, compression, defaultValue, blockSize, null, + position); } public boolean isKey() { return isKey_; } public boolean isNullable() { return isNullable_; } + public Encoding getEncoding() { return encoding_; } + public CompressionAlgorithm getCompression() { return compression_; } + public LiteralExpr getDefaultValue() { return defaultValue_; } + public boolean hasDefaultValue() { return defaultValue_ != null; } + public int getBlockSize() { return blockSize_; } @Override public TColumn toThrift() { TColumn colDesc = new TColumn(name_, type_.toThrift()); + KuduUtil.setColumnOptions(colDesc, isKey_, isNullable_, encoding_, compression_, + defaultValue_, blockSize_); if (comment_ != null) colDesc.setComment(comment_); colDesc.setCol_stats(getStats().toThrift()); colDesc.setPosition(position_); colDesc.setIs_kudu_column(true); - colDesc.setIs_key(isKey_); - colDesc.setIs_nullable(isNullable_); return colDesc; } } diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index 7b906a730..0e8890583 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -215,12 +215,13 @@ public class KuduTable extends Table { cols.clear(); int pos = 0; for (ColumnSchema colSchema: kuduTable.getSchema().getColumns()) { - Type type = KuduUtil.toImpalaType(colSchema.getType()); - String colName = colSchema.getName(); - cols.add(new FieldSchema(colName, type.toSql().toLowerCase(), null)); - boolean isKey = colSchema.isKey(); - if (isKey) primaryKeyColumnNames_.add(colName); - addColumn(new KuduColumn(colName, isKey, !isKey, type, null, pos)); + KuduColumn kuduCol = KuduColumn.fromColumnSchema(colSchema, pos); + Preconditions.checkNotNull(kuduCol); + // Add the HMS column + cols.add(new FieldSchema(kuduCol.getName(), kuduCol.getType().toSql().toLowerCase(), + null)); + if (kuduCol.isKey()) primaryKeyColumnNames_.add(kuduCol.getName()); + addColumn(kuduCol); ++pos; } } diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index be9dc7b67..7bde786f7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -256,12 +256,17 @@ public abstract class Table implements CatalogObject { colsByPos_.clear(); colsByPos_.ensureCapacity(columns.size()); - for (int i = 0; i < columns.size(); ++i) { - Column col = Column.fromThrift(columns.get(i)); - colsByPos_.add(col.getPosition(), col); - colsByName_.put(col.getName().toLowerCase(), col); - ((StructType) type_.getItemType()).addField( - new StructField(col.getName(), col.getType(), col.getComment())); + try { + for (int i = 0; i < columns.size(); ++i) { + Column col = Column.fromThrift(columns.get(i)); + colsByPos_.add(col.getPosition(), col); + colsByName_.put(col.getName().toLowerCase(), col); + ((StructType) type_.getItemType()).addField( + new StructField(col.getName(), col.getType(), col.getComment())); + } + } catch (ImpalaRuntimeException e) { + throw new TableLoadingException(String.format("Error loading schema for " + + "table '%s'", getName()), e); } numClusteringCols_ = thriftTable.getClustering_columns().size(); diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java index 52524fb06..658fd0ae5 100644 --- a/fe/src/main/java/org/apache/impala/planner/Planner.java +++ b/fe/src/main/java/org/apache/impala/planner/Planner.java @@ -186,6 +186,8 @@ public class Planner { Table targetTable = ctx_.getAnalysisResult().getInsertStmt().getTargetTable(); graph.addTargetColumnLabels(targetTable); Preconditions.checkNotNull(targetTable); + // Lineage is not currently supported for Kudu tables (see IMPALA-4283) + if (targetTable instanceof KuduTable) return fragments; List exprs = Lists.newArrayList(); if (targetTable instanceof HBaseTable) { exprs.addAll(resultExprs); diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java index a2b1fb9ad..068f42604 100644 --- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java @@ -31,6 +31,7 @@ import org.apache.impala.catalog.TableNotFoundException; import org.apache.impala.catalog.Type; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Pair; +import org.apache.impala.thrift.TColumn; import org.apache.impala.thrift.TCreateTableParams; import org.apache.impala.thrift.TDistributeParam; import org.apache.impala.thrift.TRangePartition; @@ -74,7 +75,7 @@ public class KuduCatalogOpExecutor { throw new ImpalaRuntimeException(String.format( "Table '%s' already exists in Kudu.", kuduTableName)); } - Schema schema = createTableSchema(msTbl, params); + Schema schema = createTableSchema(params); CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema); kudu.createTable(kuduTableName, schema, tableOpts); } catch (Exception e) { @@ -86,22 +87,31 @@ public class KuduCatalogOpExecutor { /** * Creates the schema of a new Kudu table. */ - private static Schema createTableSchema( - org.apache.hadoop.hive.metastore.api.Table msTbl, TCreateTableParams params) + private static Schema createTableSchema(TCreateTableParams params) throws ImpalaRuntimeException { Set keyColNames = new HashSet<>(params.getPrimary_key_column_names()); - List fieldSchemas = msTbl.getSd().getCols(); - List colSchemas = new ArrayList<>(fieldSchemas.size()); - for (FieldSchema fieldSchema: fieldSchemas) { - Type type = Type.parseColumnType(fieldSchema.getType()); + List colSchemas = new ArrayList<>(params.getColumnsSize()); + for (TColumn column: params.getColumns()) { + Type type = Type.fromThrift(column.getColumnType()); Preconditions.checkState(type != null); org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type); // Create the actual column and check if the column is a key column ColumnSchemaBuilder csb = - new ColumnSchemaBuilder(fieldSchema.getName(), kuduType); - boolean isKeyCol = keyColNames.contains(fieldSchema.getName()); - csb.key(isKeyCol); - csb.nullable(!isKeyCol); + new ColumnSchemaBuilder(column.getColumnName(), kuduType); + Preconditions.checkState(column.isSetIs_key()); + csb.key(keyColNames.contains(column.getColumnName())); + if (column.isSetIs_nullable()) csb.nullable(column.isIs_nullable()); + if (column.isSetDefault_value()) { + csb.defaultValue(KuduUtil.getKuduDefaultValue(column.getDefault_value(), kuduType, + column.getColumnName())); + } + if (column.isSetBlock_size()) csb.desiredBlockSize(column.getBlock_size()); + if (column.isSetEncoding()) { + csb.encoding(KuduUtil.fromThrift(column.getEncoding())); + } + if (column.isSetCompression()) { + csb.compressionAlgorithm(KuduUtil.fromThrift(column.getCompression())); + } colSchemas.add(csb.build()); } return new Schema(colSchemas); diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java index 36a586be0..98407660c 100644 --- a/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java +++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java @@ -29,10 +29,12 @@ import java.util.Hashtable; import java.util.List; import java.util.Map; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; -import org.codehaus.jackson.JsonNode; - import org.apache.impala.analysis.ColumnDef; import org.apache.impala.analysis.TypeDef; import org.apache.impala.catalog.ArrayType; @@ -42,8 +44,7 @@ import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; import org.apache.impala.catalog.Type; import org.apache.impala.common.AnalysisException; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import org.codehaus.jackson.JsonNode; /** * Utility class used to parse Avro schema. Checks that the schema is valid @@ -81,9 +82,12 @@ public class AvroSchemaParser { } List colDefs = Lists.newArrayListWithCapacity(schema.getFields().size()); for (Schema.Field field: schema.getFields()) { + Map option = Maps.newHashMap(); + String comment = field.doc(); + if (comment != null) option.put(ColumnDef.Option.COMMENT, comment); ColumnDef colDef = new ColumnDef(field.name(), - new TypeDef(getTypeInfo(field.schema(), field.name())), field.doc()); - colDef.analyze(); + new TypeDef(getTypeInfo(field.schema(), field.name())), option); + colDef.analyze(null); colDefs.add(colDef); } return colDefs; @@ -201,4 +205,4 @@ public class AvroSchemaParser { } return propValue; } -} \ No newline at end of file +} diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java index f5b3bb4f0..1da466e61 100644 --- a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java +++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java @@ -23,19 +23,19 @@ import java.net.URL; import java.util.List; import java.util.Map; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.io.IOUtils; + import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; - import org.apache.impala.analysis.ColumnDef; import org.apache.impala.catalog.PrimitiveType; import org.apache.impala.common.AnalysisException; import org.apache.impala.common.FileSystemUtil; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - /** * Contains utility functions for dealing with Avro schemas. */ @@ -139,10 +139,13 @@ public class AvroSchemaUtils { if ((colDef.getType().isStringType() && avroCol.getType().isStringType())) { Preconditions.checkState( avroCol.getType().getPrimitiveType() == PrimitiveType.STRING); + Map option = Maps.newHashMap(); + String comment = avroCol.getComment(); + if (comment != null) option.put(ColumnDef.Option.COMMENT, comment); ColumnDef reconciledColDef = new ColumnDef( - avroCol.getColName(), colDef.getTypeDef(), avroCol.getComment()); + avroCol.getColName(), colDef.getTypeDef(), option); try { - reconciledColDef.analyze(); + reconciledColDef.analyze(null); } catch (AnalysisException e) { Preconditions.checkNotNull( null, "reconciledColDef.analyze() should never throw."); diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index 65fae743a..dd09a2819 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -29,18 +29,26 @@ import org.apache.impala.common.Pair; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TExpr; import org.apache.impala.thrift.TExprNode; +import org.apache.impala.analysis.LiteralExpr; +import org.apache.impala.analysis.Expr; +import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TColumnEncoding; +import org.apache.impala.thrift.THdfsCompression; + +import com.google.common.base.Splitter; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import org.apache.kudu.ColumnSchema; +import org.apache.kudu.ColumnSchema.Encoding; +import org.apache.kudu.ColumnSchema.CompressionAlgorithm; import org.apache.kudu.Schema; import org.apache.kudu.client.KuduClient; import org.apache.kudu.client.KuduClient.KuduClientBuilder; import org.apache.kudu.client.PartialRow; import org.apache.kudu.client.RangePartitionBound; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - public class KuduUtil { private static final String KUDU_TABLE_NAME_PREFIX = "impala::"; @@ -138,6 +146,145 @@ public class KuduUtil { } } + public static Object getKuduDefaultValue(TExpr defaultValue, + org.apache.kudu.Type type, String colName) throws ImpalaRuntimeException { + Preconditions.checkState(defaultValue.getNodes().size() == 1); + TExprNode literal = defaultValue.getNodes().get(0); + switch (type) { + case INT8: + checkCorrectType(literal.isSetInt_literal(), type, colName, literal); + return (byte) literal.getInt_literal().getValue(); + case INT16: + checkCorrectType(literal.isSetInt_literal(), type, colName, literal); + return (short) literal.getInt_literal().getValue(); + case INT32: + checkCorrectType(literal.isSetInt_literal(), type, colName, literal); + return (int) literal.getInt_literal().getValue(); + case INT64: + checkCorrectType(literal.isSetInt_literal(), type, colName, literal); + return (long) literal.getInt_literal().getValue(); + case FLOAT: + checkCorrectType(literal.isSetFloat_literal(), type, colName, literal); + return (float) literal.getFloat_literal().getValue(); + case DOUBLE: + checkCorrectType(literal.isSetFloat_literal(), type, colName, literal); + return (double) literal.getFloat_literal().getValue(); + case STRING: + checkCorrectType(literal.isSetString_literal(), type, colName, literal); + return literal.getString_literal().getValue(); + default: + throw new ImpalaRuntimeException("Unsupported value for column type: " + + type.toString()); + } + } + + public static Encoding fromThrift(TColumnEncoding encoding) + throws ImpalaRuntimeException { + switch (encoding) { + case AUTO: + return Encoding.AUTO_ENCODING; + case PLAIN: + return Encoding.PLAIN_ENCODING; + case PREFIX: + return Encoding.PREFIX_ENCODING; + case GROUP_VARINT: + return Encoding.GROUP_VARINT; + case RLE: + return Encoding.RLE; + case DICTIONARY: + return Encoding.DICT_ENCODING; + case BIT_SHUFFLE: + return Encoding.BIT_SHUFFLE; + default: + throw new ImpalaRuntimeException("Unsupported encoding: " + + encoding.toString()); + } + } + + public static TColumnEncoding toThrift(Encoding encoding) + throws ImpalaRuntimeException { + switch (encoding) { + case AUTO_ENCODING: + return TColumnEncoding.AUTO; + case PLAIN_ENCODING: + return TColumnEncoding.PLAIN; + case PREFIX_ENCODING: + return TColumnEncoding.PREFIX; + case GROUP_VARINT: + return TColumnEncoding.GROUP_VARINT; + case RLE: + return TColumnEncoding.RLE; + case DICT_ENCODING: + return TColumnEncoding.DICTIONARY; + case BIT_SHUFFLE: + return TColumnEncoding.BIT_SHUFFLE; + default: + throw new ImpalaRuntimeException("Unsupported encoding: " + + encoding.toString()); + } + } + + public static CompressionAlgorithm fromThrift(THdfsCompression compression) + throws ImpalaRuntimeException { + switch (compression) { + case DEFAULT: + return CompressionAlgorithm.DEFAULT_COMPRESSION; + case NONE: + return CompressionAlgorithm.NO_COMPRESSION; + case SNAPPY: + return CompressionAlgorithm.SNAPPY; + case LZ4: + return CompressionAlgorithm.LZ4; + case ZLIB: + return CompressionAlgorithm.ZLIB; + default: + throw new ImpalaRuntimeException("Unsupported compression algorithm: " + + compression.toString()); + } + } + + public static THdfsCompression toThrift(CompressionAlgorithm compression) + throws ImpalaRuntimeException { + switch (compression) { + case NO_COMPRESSION: + return THdfsCompression.NONE; + case DEFAULT_COMPRESSION: + return THdfsCompression.DEFAULT; + case SNAPPY: + return THdfsCompression.SNAPPY; + case LZ4: + return THdfsCompression.LZ4; + case ZLIB: + return THdfsCompression.ZLIB; + default: + throw new ImpalaRuntimeException("Unsupported compression algorithm: " + + compression.toString()); + } + } + + public static TColumn setColumnOptions(TColumn column, boolean isKey, + Boolean isNullable, Encoding encoding, CompressionAlgorithm compression, + Expr defaultValue, Integer blockSize) { + column.setIs_key(isKey); + if (isNullable != null) column.setIs_nullable(isNullable); + try { + if (encoding != null) column.setEncoding(toThrift(encoding)); + if (compression != null) column.setCompression(toThrift(compression)); + } catch (ImpalaRuntimeException e) { + // This shouldn't happen + throw new IllegalStateException(String.format("Error parsing " + + "encoding/compression values for Kudu column '%s': %s", column.getColumnName(), + e.getMessage())); + } + + if (defaultValue != null) { + Preconditions.checkState(defaultValue instanceof LiteralExpr); + column.setDefault_value(defaultValue.treeToThrift()); + } + if (blockSize != null) column.setBlock_size(blockSize); + return column; + } + /** * If correctType is true, returns. Otherwise throws a formatted error message * indicating problems with the type of the literal of the range literal. diff --git a/fe/src/main/jflex/sql-scanner.flex b/fe/src/main/jflex/sql-scanner.flex index 099ceda9d..3d3b24dee 100644 --- a/fe/src/main/jflex/sql-scanner.flex +++ b/fe/src/main/jflex/sql-scanner.flex @@ -68,6 +68,7 @@ import org.apache.impala.analysis.SqlParserSymbols; keywordMap.put("between", new Integer(SqlParserSymbols.KW_BETWEEN)); keywordMap.put("bigint", new Integer(SqlParserSymbols.KW_BIGINT)); keywordMap.put("binary", new Integer(SqlParserSymbols.KW_BINARY)); + keywordMap.put("block_size", new Integer(SqlParserSymbols.KW_BLOCKSIZE)); keywordMap.put("boolean", new Integer(SqlParserSymbols.KW_BOOLEAN)); keywordMap.put("buckets", new Integer(SqlParserSymbols.KW_BUCKETS)); keywordMap.put("by", new Integer(SqlParserSymbols.KW_BY)); @@ -82,6 +83,7 @@ import org.apache.impala.analysis.SqlParserSymbols; keywordMap.put("column", new Integer(SqlParserSymbols.KW_COLUMN)); keywordMap.put("columns", new Integer(SqlParserSymbols.KW_COLUMNS)); keywordMap.put("comment", new Integer(SqlParserSymbols.KW_COMMENT)); + keywordMap.put("compression", new Integer(SqlParserSymbols.KW_COMPRESSION)); keywordMap.put("compute", new Integer(SqlParserSymbols.KW_COMPUTE)); keywordMap.put("create", new Integer(SqlParserSymbols.KW_CREATE)); keywordMap.put("cross", new Integer(SqlParserSymbols.KW_CROSS)); @@ -92,6 +94,7 @@ import org.apache.impala.analysis.SqlParserSymbols; keywordMap.put("date", new Integer(SqlParserSymbols.KW_DATE)); keywordMap.put("datetime", new Integer(SqlParserSymbols.KW_DATETIME)); keywordMap.put("decimal", new Integer(SqlParserSymbols.KW_DECIMAL)); + keywordMap.put("default", new Integer(SqlParserSymbols.KW_DEFAULT)); keywordMap.put("delete", new Integer(SqlParserSymbols.KW_DELETE)); keywordMap.put("delimited", new Integer(SqlParserSymbols.KW_DELIMITED)); keywordMap.put("desc", new Integer(SqlParserSymbols.KW_DESC)); @@ -102,6 +105,7 @@ import org.apache.impala.analysis.SqlParserSymbols; keywordMap.put("double", new Integer(SqlParserSymbols.KW_DOUBLE)); keywordMap.put("drop", new Integer(SqlParserSymbols.KW_DROP)); keywordMap.put("else", new Integer(SqlParserSymbols.KW_ELSE)); + keywordMap.put("encoding", new Integer(SqlParserSymbols.KW_ENCODING)); keywordMap.put("end", new Integer(SqlParserSymbols.KW_END)); keywordMap.put("escaped", new Integer(SqlParserSymbols.KW_ESCAPED)); keywordMap.put("exists", new Integer(SqlParserSymbols.KW_EXISTS)); diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java index 0a78dd55f..ed900bfee 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeDDLTest.java @@ -48,6 +48,8 @@ import org.apache.impala.common.FrontendTestBase; import org.apache.impala.common.RuntimeEnv; import org.apache.impala.testutil.TestUtils; import org.apache.impala.util.MetaStoreUtil; +import org.apache.kudu.ColumnSchema.CompressionAlgorithm; +import org.apache.kudu.ColumnSchema.Encoding; import org.junit.Test; import com.google.common.base.Joiner; @@ -1615,6 +1617,9 @@ public class AnalyzeDDLTest extends FrontendTestBase { // Supported file formats. Exclude Avro since it is tested separately. String [] fileFormats = {"TEXTFILE", "SEQUENCEFILE", "PARQUET", "PARQUETFILE", "RCFILE"}; + String [] fileFormatsStr = + {"TEXT", "SEQUENCE_FILE", "PARQUET", "PARQUET", "RC_FILE"}; + int formatIndx = 0; for (String format: fileFormats) { for (String create: ImmutableList.of("create table", "create external table")) { AnalyzesOk(String.format("%s new_table (i int) " + @@ -1625,9 +1630,11 @@ public class AnalyzeDDLTest extends FrontendTestBase { "Table requires at least 1 column"); } AnalysisError(String.format("create table t (i int primary key) stored as %s", - format), "Only Kudu tables can specify a PRIMARY KEY"); + format), String.format("Unsupported column options for file format " + + "'%s': 'i INT PRIMARY KEY'", fileFormatsStr[formatIndx])); AnalysisError(String.format("create table t (i int, primary key(i)) stored as %s", format), "Only Kudu tables can specify a PRIMARY KEY"); + formatIndx++; } // Note: Backslashes need to be escaped twice - once for Java and once for Impala. @@ -1986,7 +1993,7 @@ public class AnalyzeDDLTest extends FrontendTestBase { "partition 10 < values <= 30, partition 30 < values) " + "stored as kudu tblproperties('kudu.master_addresses'='127.0.0.1')"); // Not using the STORED AS KUDU syntax to specify a Kudu table - AnalysisError("create table tab (x int primary key) tblproperties (" + + AnalysisError("create table tab (x int) tblproperties (" + "'storage_handler'='com.cloudera.kudu.hive.KuduStorageHandler')", CreateTableStmt.KUDU_STORAGE_HANDLER_ERROR_MESSAGE); AnalysisError("create table tab (x int primary key) stored as kudu tblproperties (" + @@ -2035,6 +2042,86 @@ public class AnalyzeDDLTest extends FrontendTestBase { "distribute by hash(x) into 3 buckets stored as kudu", t); AnalysisError(stmt, expectedError); } + + // Test column options + String[] nullability = {"not null", "null", ""}; + String[] defaultVal = {"default 10", ""}; + String[] blockSize = {"block_size 4096", ""}; + for (Encoding enc: Encoding.values()) { + for (CompressionAlgorithm comp: CompressionAlgorithm.values()) { + for (String nul: nullability) { + for (String def: defaultVal) { + for (String block: blockSize) { + AnalyzesOk(String.format("create table tab (x int primary key " + + "not null encoding %s compression %s %s %s, y int encoding %s " + + "compression %s %s %s %s) distribute by hash (x) " + + "into 3 buckets stored as kudu", enc, comp, def, block, enc, + comp, def, nul, block)); + } + } + } + } + } + // Primary key specified using the PRIMARY KEY clause + AnalyzesOk("create table tab (x int not null encoding plain_encoding " + + "compression snappy block_size 1, y int null encoding rle compression lz4 " + + "default 1, primary key(x)) distribute by hash (x) into 3 buckets " + + "stored as kudu"); + // Primary keys can't be null + AnalysisError("create table tab (x int primary key null, y int not null) " + + "distribute by hash (x) into 3 buckets stored as kudu", "Primary key columns " + + "cannot be nullable: x INT PRIMARY KEY NULL"); + AnalysisError("create table tab (x int not null, y int null, primary key (x, y)) " + + "distribute by hash (x) into 3 buckets stored as kudu", "Primary key columns " + + "cannot be nullable: y INT NULL"); + // Unsupported encoding value + AnalysisError("create table tab (x int primary key, y int encoding invalid_enc) " + + "distribute by hash (x) into 3 buckets stored as kudu", "Unsupported encoding " + + "value 'INVALID_ENC'. Supported encoding values are: " + + Joiner.on(", ").join(Encoding.values())); + // Unsupported compression algorithm + AnalysisError("create table tab (x int primary key, y int compression " + + "invalid_comp) distribute by hash (x) into 3 buckets stored as kudu", + "Unsupported compression algorithm 'INVALID_COMP'. Supported compression " + + "algorithms are: " + Joiner.on(", ").join(CompressionAlgorithm.values())); + // Default values + AnalyzesOk("create table tab (i1 tinyint default 1, i2 smallint default 10, " + + "i3 int default 100, i4 bigint default 1000, vals string default 'test', " + + "valf float default cast(1.2 as float), vald double default " + + "cast(3.1452 as double), valb boolean default true, " + + "primary key (i1, i2, i3, i4, vals)) distribute by hash (i1) into 3 " + + "buckets stored as kudu"); + AnalyzesOk("create table tab (i int primary key default 1+1+1) " + + "distribute by hash (i) into 3 buckets stored as kudu"); + AnalyzesOk("create table tab (i int primary key default factorial(5)) " + + "distribute by hash (i) into 3 buckets stored as kudu"); + AnalyzesOk("create table tab (i int primary key, x int null default " + + "isnull(null, null)) distribute by hash (i) into 3 buckets stored as kudu"); + // Invalid default values + AnalysisError("create table tab (i int primary key default 'string_val') " + + "distribute by hash (i) into 3 buckets stored as kudu", "Default value " + + "'string_val' (type: STRING) is not compatible with column 'i' (type: INT)."); + AnalysisError("create table tab (i int primary key, x int default 1.1) " + + "distribute by hash (i) into 3 buckets stored as kudu", + "Default value 1.1 (type: DECIMAL(2,1)) is not compatible with column " + + "'x' (type: INT)."); + AnalysisError("create table tab (i tinyint primary key default 128) " + + "distribute by hash (i) into 3 buckets stored as kudu", "Default value " + + "128 (type: SMALLINT) is not compatible with column 'i' (type: TINYINT)."); + AnalysisError("create table tab (i int primary key default isnull(null, null)) " + + "distribute by hash (i) into 3 buckets stored as kudu", "Default value of " + + "NULL not allowed on non-nullable column: 'i'"); + AnalysisError("create table tab (i int primary key, x int not null " + + "default isnull(null, null)) distribute by hash (i) into 3 buckets " + + "stored as kudu", "Default value of NULL not allowed on non-nullable column: " + + "'x'"); + // Invalid block_size values + AnalysisError("create table tab (i int primary key block_size 1.1) " + + "distribute by hash (i) into 3 buckets stored as kudu", "Invalid value " + + "for BLOCK_SIZE: 1.1. A positive INTEGER value is expected."); + AnalysisError("create table tab (i int primary key block_size 'val') " + + "distribute by hash (i) into 3 buckets stored as kudu", "Invalid value " + + "for BLOCK_SIZE: 'val'. A positive INTEGER value is expected."); } @Test @@ -2279,11 +2366,12 @@ public class AnalyzeDDLTest extends FrontendTestBase { "Type 'STRUCT' is not supported as partition-column type in column: x"); // Kudu specific clauses used in an Avro table. - AnalysisError("create table functional.new_table (i int primary key) " + + AnalysisError("create table functional.new_table (i int) " + "distribute by hash(i) into 3 buckets stored as avro", "Only Kudu tables can use the DISTRIBUTE BY clause."); AnalysisError("create table functional.new_table (i int primary key) " + - "stored as avro", "Only Kudu tables can specify a PRIMARY KEY."); + "stored as avro", "Unsupported column options for file format 'AVRO': " + + "'i INT PRIMARY KEY'"); } @Test diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java index 69c90da49..3cef4ff5c 100644 --- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java @@ -929,8 +929,8 @@ public class ParserTest extends FrontendTestBase { @Test public void TestIdentQuoting() { ParsesOk("select a from `t`"); - ParsesOk("select a from `default`.`t`"); - ParsesOk("select a from `default`.t"); + ParsesOk("select a from default.`t`"); + ParsesOk("select a from default.t"); ParsesOk("select a from default.`t`"); ParsesOk("select 01a from default.`01_t`"); @@ -962,7 +962,7 @@ public class ParserTest extends FrontendTestBase { // Quoted identifiers can contain any characters except "`". ParsesOk("select a from `all types`"); - ParsesOk("select a from `default`.`all types`"); + ParsesOk("select a from default.`all types`"); ParsesOk("select a from `~!@#$%^&*()-_=+|;:'\",<.>/?`"); // Quoted identifiers do not unescape escape sequences. ParsesOk("select a from `ab\rabc`"); @@ -1676,7 +1676,7 @@ public class ParserTest extends FrontendTestBase { @Test public void TestKuduUpdate() { - TestUtils.assumeKuduIsSupported(); + //TestUtils.assumeKuduIsSupported(); ParserError("update (select * from functional_kudu.testtbl) a set name = '10'"); } @@ -2456,6 +2456,51 @@ public class ParserTest extends FrontendTestBase { "(PARTITION VALUES = 10) STORED AS KUDU"); ParserError("CREATE TABLE Foo (a int) DISTRIBUTE BY RANGE (a) " + "(PARTITION 10 < VALUE < 20) STORED AS KUDU"); + + // Column options for Kudu tables + String[] encodings = {"encoding auto_encoding", "encoding plain_encoding", + "encoding prefix_encoding", "encoding group_varint", "encoding rle", + "encoding dict_encoding", "encoding bit_shuffle", "encoding unknown", ""}; + String[] compression = {"compression default_compression", + "compression no_compression", "compression snappy", "compression lz4", + "compression zlib", "compression unknown", ""}; + + String[] nullability = {"not null", "null", ""}; + String[] defaultVal = {"default 10", ""}; + String[] blockSize = {"block_size 4096", ""}; + for (String enc: encodings) { + for (String comp: compression) { + for (String nul: nullability) { + for (String def: defaultVal) { + for (String block: blockSize) { + ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " + + "%s %s %s %s %s) STORED AS KUDU", nul, enc, comp, def, block)); + ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " + + "%s %s %s %s %s) STORED AS KUDU", block, nul, enc, comp, def)); + ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " + + "%s %s %s %s %s) STORED AS KUDU", def, block, nul, enc, comp)); + ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " + + "%s %s %s %s %s) STORED AS KUDU", comp, def, block, nul, enc)); + ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " + + "%s %s %s %s %s) STORED AS KUDU", enc, comp, def, block, nul)); + ParsesOk(String.format("CREATE TABLE Foo (i int PRIMARY KEY " + + "%s %s %s %s %s) STORED AS KUDU", enc, comp, block, def, nul)); + } + } + } + } + } + // Column option is specified multiple times for the same column + ParserError("CREATE TABLE Foo(a int PRIMARY KEY ENCODING RLE ENCODING PLAIN) " + + "STORED AS KUDU"); + // Constant expr used in DEFAULT + ParsesOk("CREATE TABLE Foo(a int PRIMARY KEY, b int DEFAULT 1+1) STORED AS KUDU"); + ParsesOk("CREATE TABLE Foo(a int PRIMARY KEY, b float DEFAULT cast(1.1 as float)) " + + "STORED AS KUDU"); + // Non-literal value used in BLOCK_SIZE + ParserError("CREATE TABLE Foo(a int PRIMARY KEY, b int BLOCK_SIZE 1+1) " + + "STORED AS KUDU"); + ParserError("CREATE TABLE Foo(a int PRIMARY KEY BLOCK_SIZE -1) STORED AS KUDU"); } @Test @@ -2886,7 +2931,7 @@ public class ParserTest extends FrontendTestBase { "select from t\n" + " ^\n" + "Encountered: FROM\n" + - "Expected: ALL, CASE, CAST, DISTINCT, EXISTS, " + + "Expected: ALL, CASE, CAST, DEFAULT, DISTINCT, EXISTS, " + "FALSE, IF, INTERVAL, NOT, NULL, " + "STRAIGHT_JOIN, TRUNCATE, TRUE, IDENTIFIER\n"); @@ -2896,8 +2941,8 @@ public class ParserTest extends FrontendTestBase { "select c, b, c where a = 5\n" + " ^\n" + "Encountered: WHERE\n" + - "Expected: AND, AS, BETWEEN, DIV, FROM, ILIKE, IN, IREGEXP, IS, LIKE, LIMIT, NOT, OR, " + - "ORDER, REGEXP, RLIKE, UNION, COMMA, IDENTIFIER\n"); + "Expected: AND, AS, BETWEEN, DEFAULT, DIV, FROM, ILIKE, IN, IREGEXP, IS, LIKE, " + + "LIMIT, NOT, OR, ORDER, REGEXP, RLIKE, UNION, COMMA, IDENTIFIER\n"); // missing table list ParserError("select c, b, c from where a = 5", @@ -2905,7 +2950,7 @@ public class ParserTest extends FrontendTestBase { "select c, b, c from where a = 5\n" + " ^\n" + "Encountered: WHERE\n" + - "Expected: IDENTIFIER\n"); + "Expected: DEFAULT, IDENTIFIER\n"); // missing predicate in where clause (no group by) ParserError("select c, b, c from t where", @@ -2913,7 +2958,7 @@ public class ParserTest extends FrontendTestBase { "select c, b, c from t where\n" + " ^\n" + "Encountered: EOF\n" + - "Expected: CASE, CAST, EXISTS, FALSE, " + + "Expected: CASE, CAST, DEFAULT, EXISTS, FALSE, " + "IF, INTERVAL, NOT, NULL, TRUNCATE, TRUE, IDENTIFIER\n"); // missing predicate in where clause (group by) @@ -2922,7 +2967,7 @@ public class ParserTest extends FrontendTestBase { "select c, b, c from t where group by a, b\n" + " ^\n" + "Encountered: GROUP\n" + - "Expected: CASE, CAST, EXISTS, FALSE, " + + "Expected: CASE, CAST, DEFAULT, EXISTS, FALSE, " + "IF, INTERVAL, NOT, NULL, TRUNCATE, TRUE, IDENTIFIER\n"); // unmatched string literal starting with " @@ -2983,7 +3028,7 @@ public class ParserTest extends FrontendTestBase { "...c,c,c,c,c,c,c,c,cd,c,d,d, ,c, from t\n" + " ^\n" + "Encountered: COMMA\n" + - "Expected: CASE, CAST, EXISTS, FALSE, " + + "Expected: CASE, CAST, DEFAULT, EXISTS, FALSE, " + "IF, INTERVAL, NOT, NULL, TRUNCATE, TRUE, IDENTIFIER\n"); // Parsing identifiers that have different names printed as EXPECTED @@ -3004,7 +3049,7 @@ public class ParserTest extends FrontendTestBase { "USE ` `\n" + " ^\n" + "Encountered: EMPTY IDENTIFIER\n" + - "Expected: IDENTIFIER\n"); + "Expected: DEFAULT, IDENTIFIER\n"); // Expecting = token ParserError("SET foo", diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index be6b232b6..1940c311a 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -551,20 +551,20 @@ DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}_idx; CREATE TABLE {db_name}{db_suffix}.{table_name}_idx ( kudu_idx BIGINT PRIMARY KEY, - id INT, - bool_col BOOLEAN, - tinyint_col TINYINT, - smallint_col SMALLINT, - int_col INT, - bigint_col BIGINT, - float_col FLOAT, - double_col DOUBLE, - date_string_col STRING, - string_col STRING, - timestamp_col STRING, - year INT, - month INT, - day INT + id INT NULL, + bool_col BOOLEAN NULL, + tinyint_col TINYINT NULL, + smallint_col SMALLINT NULL, + int_col INT NULL, + bigint_col BIGINT NULL, + float_col FLOAT NULL, + double_col DOUBLE NULL, + date_string_col STRING NULL, + string_col STRING NULL, + timestamp_col STRING NULL, + year INT NULL, + month INT NULL, + day INT NULL ) DISTRIBUTE BY HASH (kudu_idx) INTO 3 BUCKETS STORED AS KUDU; CREATE VIEW {db_name}{db_suffix}.{table_name} AS @@ -763,8 +763,8 @@ delimited fields terminated by ',' escaped by '\\' DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}; create table {db_name}{db_suffix}.{table_name} ( id bigint primary key, - name string, - zip int + name string null, + zip int null ) distribute by range(id) (partition values <= 1003, partition 1003 < values <= 1007, partition 1007 < values) stored as kudu; @@ -1310,7 +1310,8 @@ OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name}; ---- CREATE_KUDU DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}; create table {db_name}{db_suffix}.{table_name} ( - a string primary key, b string, c string, d int, e double, f string, g string + a string primary key, b string null, c string null, d int null, e double null, + f string null, g string null ) distribute by hash(a) into 3 buckets stored as kudu; ==== @@ -1412,10 +1413,10 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/ImpalaDemoDataset/DEC_00_SF3_P077 DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}; create table {db_name}{db_suffix}.{table_name} ( id string primary key, - zip string, - description1 string, - description2 string, - income int) + zip string null, + description1 string null, + description2 string null, + income int null) distribute by range(id) (partition values <= '8600000US01475', partition '8600000US01475' < values <= '8600000US63121', diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_delete.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_delete.test index ac1bcc443..9e2a9241f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_delete.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_delete.test @@ -1,8 +1,8 @@ ==== ---- QUERY create table tdata - (id int primary key, valf float, vali bigint, valv string, valb boolean, valt tinyint, - vals smallint, vald double) + (id int primary key, valf float null, vali bigint null, valv string null, + valb boolean null, valt tinyint null, vals smallint null, vald double null) DISTRIBUTE BY RANGE (PARTITION VALUES < 100, PARTITION 100 <= VALUES < 1000, PARTITION 1000 <= VALUES <= 10000) STORED AS KUDU ---- RESULTS @@ -297,8 +297,9 @@ INT,FLOAT,BIGINT,STRING,BOOLEAN,TINYINT,SMALLINT,DOUBLE ==== ---- QUERY create table multiple_key_cols - (string_col string, bigint_col bigint, tinyint_col tinyint, smallint_col smallint, - bool_col boolean, int_col int, double_col double, float_col float, + (string_col string, bigint_col bigint, tinyint_col tinyint, + smallint_col smallint, bool_col boolean null, int_col int null, + double_col double null, float_col float null, primary key (string_col, bigint_col, tinyint_col, smallint_col)) DISTRIBUTE BY HASH (string_col) INTO 16 BUCKETS STORED AS KUDU ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test index 44cae4a95..759dc5eed 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_insert.test @@ -1,8 +1,8 @@ ==== ---- QUERY create table tdata - (id int primary key, valf float, vali bigint, valv string, valb boolean, valt tinyint, - vals smallint, vald double) + (id int primary key, valf float null, vali bigint null, valv string null, + valb boolean null, valt tinyint null, vals smallint null, vald double null) DISTRIBUTE BY RANGE (PARTITION VALUES < 10, PARTITION 10 <= VALUES < 30, PARTITION 30 <= VALUES) STORED AS KUDU ---- RESULTS diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test index fe0a6b1f1..27120f5b2 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_update.test @@ -1,8 +1,9 @@ ==== ---- QUERY create table tdata - (id int primary key, name string, valf float, vali bigint, valv string, valb boolean, - valt tinyint, vals smallint, vald double) + (id int primary key, name string null, valf float null, vali bigint null, + valv string null, valb boolean null, valt tinyint null, vals smallint null, + vald double null) DISTRIBUTE BY RANGE (PARTITION VALUES < 10, PARTITION 10 <= VALUES < 30, PARTITION 30 <= VALUES <= 10000) STORED AS KUDU ---- RESULTS @@ -337,4 +338,4 @@ update tdata set vali = -1 ---- RUNTIME_PROFILE NumModifiedRows: 7300 NumRowErrors: 0 -==== \ No newline at end of file +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_upsert.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_upsert.test index 5b5752af7..0f117f128 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_upsert.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_upsert.test @@ -1,8 +1,9 @@ ==== ---- QUERY create table tdata - (id int primary key, name string, valf float, vali bigint, valv string, valb boolean, - valt tinyint, vals smallint, vald double) + (id int primary key, name string null, valf float null, vali bigint null, + valv string null, valb boolean null, valt tinyint null, vals smallint null, + vald double null) DISTRIBUTE BY RANGE (PARTITION VALUES < 10, PARTITION 10 <= VALUES < 30, PARTITION 30 <= VALUES) STORED AS KUDU ---- RESULTS @@ -389,8 +390,8 @@ NumRowErrors: 0 ---- QUERY create table multiple_key_cols (string_col string, bigint_col bigint, tinyint_col tinyint, smallint_col smallint, - bool_col boolean, int_col int, double_col double, float_col float, - primary key (string_col, bigint_col, tinyint_col, smallint_col)) + bool_col boolean null, int_col int null, double_col double null, + float_col float null, primary key (string_col, bigint_col, tinyint_col, smallint_col)) DISTRIBUTE BY HASH (string_col) INTO 16 BUCKETS STORED AS KUDU ==== ---- QUERY @@ -488,4 +489,4 @@ upsert into table multiple_key_cols (string_col, tinyint_col, smallint_col) values ('a', 1, 1) ---- CATCH All primary key columns must be specified for UPSERTing into Kudu tables. Missing columns are: bigint_col -==== \ No newline at end of file +==== diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py index 908fc545c..996931d5b 100644 --- a/tests/query_test/test_kudu.py +++ b/tests/query_test/test_kudu.py @@ -64,6 +64,27 @@ class TestKuduOperations(KuduTestSuite): def test_kudu_stats(self, vector, unique_database): self.run_test_case('QueryTest/kudu_stats', vector, use_db=unique_database) + def test_kudu_column_options(self, cursor, kudu_client, unique_database): + encodings = ["ENCODING PLAIN_ENCODING", ""] + compressions = ["COMPRESSION SNAPPY", ""] + nullability = ["NOT NULL", "NULL", ""] + defaults = ["DEFAULT 1", ""] + blocksizes = ["BLOCK_SIZE 32768", ""] + indx = 1 + for encoding in encodings: + for compression in compressions: + for default in defaults: + for blocksize in blocksizes: + for nullable in nullability: + impala_tbl_name = "test_column_options_%s" % str(indx) + cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY + %s %s %s %s, b INT %s %s %s %s %s) DISTRIBUTE BY HASH (a) INTO 3 + BUCKETS STORED AS KUDU""" % (unique_database, impala_tbl_name, + encoding, compression, default, blocksize, nullable, encoding, + compression, default, blocksize)) + indx = indx + 1 + kudu_tbl_name = "impala::%s.%s" % (unique_database, impala_tbl_name) + assert kudu_client.table_exists(kudu_tbl_name) class TestCreateExternalTable(KuduTestSuite): @@ -228,13 +249,14 @@ class TestShowCreateTable(KuduTestSuite): def test_primary_key_and_distribution(self, cursor): # TODO: Add test cases with column comments once KUDU-1711 is fixed. + # TODO: Add case with BLOCK_SIZE self.assert_show_create_equals(cursor, """ CREATE TABLE {table} (c INT PRIMARY KEY) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS STORED AS KUDU""", """ CREATE TABLE {db}.{{table}} ( - c INT, + c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, PRIMARY KEY (c) ) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS @@ -243,14 +265,14 @@ class TestShowCreateTable(KuduTestSuite): db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) self.assert_show_create_equals(cursor, """ - CREATE TABLE {table} (c INT PRIMARY KEY, d STRING) + CREATE TABLE {table} (c INT PRIMARY KEY, d STRING NULL) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) (PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2, PARTITION 2 < VALUES) STORED AS KUDU""", """ CREATE TABLE {db}.{{table}} ( - c INT, - d STRING, + c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, + d STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, PRIMARY KEY (c) ) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) (...) @@ -259,11 +281,11 @@ class TestShowCreateTable(KuduTestSuite): db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) self.assert_show_create_equals(cursor, """ - CREATE TABLE {table} (c INT, PRIMARY KEY (c)) + CREATE TABLE {table} (c INT ENCODING PLAIN_ENCODING, PRIMARY KEY (c)) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS STORED AS KUDU""", """ CREATE TABLE {db}.{{table}} ( - c INT, + c INT NOT NULL ENCODING PLAIN_ENCODING COMPRESSION DEFAULT_COMPRESSION, PRIMARY KEY (c) ) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS @@ -272,14 +294,14 @@ class TestShowCreateTable(KuduTestSuite): db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) self.assert_show_create_equals(cursor, """ - CREATE TABLE {table} (c INT, d STRING, PRIMARY KEY(c, d)) + CREATE TABLE {table} (c INT COMPRESSION LZ4, d STRING, PRIMARY KEY(c, d)) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS, RANGE (c, d) (PARTITION VALUE = (1, 'aaa'), PARTITION VALUE = (2, 'bbb')) STORED AS KUDU""", """ CREATE TABLE {db}.{{table}} ( - c INT, - d STRING, + c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4, + d STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, PRIMARY KEY (c, d) ) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS, RANGE (c, d) (...) @@ -288,14 +310,14 @@ class TestShowCreateTable(KuduTestSuite): db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS)) self.assert_show_create_equals(cursor, """ - CREATE TABLE {table} (c INT, d STRING, e INT, PRIMARY KEY(c, d)) + CREATE TABLE {table} (c INT, d STRING, e INT NULL DEFAULT 10, PRIMARY KEY(c, d)) DISTRIBUTE BY RANGE (c) (PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2, PARTITION 2 < VALUES <= 3, PARTITION 3 < VALUES) STORED AS KUDU""", """ CREATE TABLE {db}.{{table}} ( - c INT, - d STRING, - e INT, + c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, + d STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, + e INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION DEFAULT 10, PRIMARY KEY (c, d) ) DISTRIBUTE BY RANGE (c) (...) @@ -316,7 +338,7 @@ class TestShowCreateTable(KuduTestSuite): TBLPROPERTIES ({props})""".format(props=props), """ CREATE TABLE {db}.{{table}} ( - c INT, + c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, PRIMARY KEY (c) ) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS @@ -335,7 +357,7 @@ class TestShowCreateTable(KuduTestSuite): TBLPROPERTIES ({props})""".format(props=props), """ CREATE TABLE {db}.{{table}} ( - c INT, + c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION, PRIMARY KEY (c) ) DISTRIBUTE BY HASH (c) INTO 3 BUCKETS diff --git a/tests/shell/test_shell_commandline.py b/tests/shell/test_shell_commandline.py index d880c2dbf..5b9859cc5 100644 --- a/tests/shell/test_shell_commandline.py +++ b/tests/shell/test_shell_commandline.py @@ -478,7 +478,7 @@ class TestImpalaShell(ImpalaTestSuite): def test_kudu_dml_reporting(self, unique_database): db = unique_database run_impala_shell_cmd('--query="create table %s.dml_test (id int primary key, '\ - 'age int) distribute by hash(id) into 2 buckets stored as kudu"' % db) + 'age int null) distribute by hash(id) into 2 buckets stored as kudu"' % db) self._validate_dml_stmt("insert into %s.dml_test (id) values (7), (7)" % db, 1, 1) self._validate_dml_stmt("insert into %s.dml_test (id) values (7)" % db, 0, 1)