IMPALA-3718: Support subset of functional-query for Kudu

Adds initial support for the functional-query test workload
for Kudu tables.

There are a few issues that make loading the functional
schema difficult on Kudu:
 1) Kudu tables must have one or more columns that together
    constitute a unique primary key.
   a) Primary key columns must currently be the first columns
      in the table definition (KUDU-1271).
   b) Primary key columns cannot be nullable (KUDU-1570).
 2) Kudu tables must be specified with distribution
    parameters.

(1) limits the tables that can be loaded without ugly
workarounds. This patch only includes important tables that
are used for relevant tests, most notably the alltypes*
family. In particular, alltypesagg is important but it does
not have a set of columns that are non-nullable and form a unique
primary key. As a result, that table is created in Kudu with
a different name and an additional BIGINT column for a PK
that is a unique index and is generated at data loading time
using the ROW_NUMBER analytic function. A view is then
wrapped around the underlying table that matches the
alltypesagg schema exactly. When KUDU-1570 is resolved, this
can be simplified.

(2) requires some additional considerations and custom
syntax. As a result, the DDL to create the tables is
explicitly specified in CREATE_KUDU sections in the
functional_schema_constraints.csv, and an additional
DEPENDENT_LOAD_KUDU section was added to specify custom data
loading DML that differs from the existing DEPENDENT_LOAD.

TODO: IMPALA-4005: generate_schema_statements.py needs refactoring

Tests that are not relevant or not yet supported have been
marked with xfail and a skip where appropriate.

TODO: Support remaining functional tables/tests when possible.

Change-Id: Iada88e078352e4462745d9a9a1b5111260d21acc
Reviewed-on: http://gerrit.cloudera.org:8080/4175
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Internal Jenkins
This commit is contained in:
Matthew Jacobs
2016-08-29 15:00:23 -07:00
committed by Internal Jenkins
parent bd2947329e
commit c7fa03286b
19 changed files with 343 additions and 80 deletions

View File

@@ -126,7 +126,7 @@ if [[ -z "${KUDU_IS_SUPPORTED-}" ]]; then
fi
DISTRO_VERSION="$(lsb_release -sir 2>&1)"
if [[ $? -ne 0 ]]; then
echo lsb_release cammond failed, output was: "$DISTRO_VERSION" 1>&2
echo lsb_release command failed, output was: "$DISTRO_VERSION" 1>&2
return 1
fi
# Remove spaces, trim minor versions, and convert to lowercase.

View File

@@ -41,7 +41,7 @@ ${COMPUTE_STATS_SCRIPT} --db_names=tpch_nested_parquet
${COMPUTE_STATS_SCRIPT} --db_names=tpcds
if "$KUDU_IS_SUPPORTED"; then
${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu --table_names=zipcode_incomes
${COMPUTE_STATS_SCRIPT} --db_names=functional_kudu
${COMPUTE_STATS_SCRIPT} --db_names=tpch_kudu
fi

View File

@@ -529,14 +529,17 @@ def generate_statements(output_name, test_vectors, sections,
create = section['CREATE']
create_hive = section['CREATE_HIVE']
if file_format == 'kudu':
create_kudu = section["CREATE_KUDU"]
else:
create_kudu = None
table_properties = section['TABLE_PROPERTIES']
insert = eval_section(section['DEPENDENT_LOAD'])
load = eval_section(section['LOAD'])
if file_format == 'kudu':
create_kudu = section["CREATE_KUDU"]
if section['DEPENDENT_LOAD_KUDU']:
insert = eval_section(section['DEPENDENT_LOAD_KUDU'])
else:
create_kudu = None
# For some datasets we may want to use a different load strategy when running local
# tests versus tests against large scale factors. The most common reason is to
# reduce he number of partitions for the local test environment
@@ -585,9 +588,10 @@ def generate_statements(output_name, test_vectors, sections,
# Impala CREATE TABLE doesn't allow INPUTFORMAT.
output = hive_output
# TODO: Currently, Kudu does not support partitioned tables via Impala
if file_format == 'kudu' and partition_columns != '':
print "Ignore partitions on Kudu"
# TODO: Currently, Kudu does not support partitioned tables via Impala.
# If a CREATE_KUDU section was provided, assume it handles the partition columns
if file_format == 'kudu' and partition_columns != '' and not create_kudu:
print "Ignore partitions on Kudu table: %s.%s" % (db_name, table_name)
continue
# If a CREATE section is provided, use that. Otherwise a COLUMNS section
@@ -695,7 +699,7 @@ def generate_statements(output_name, test_vectors, sections,
def parse_schema_template_file(file_name):
VALID_SECTION_NAMES = ['DATASET', 'BASE_TABLE_NAME', 'COLUMNS', 'PARTITION_COLUMNS',
'ROW_FORMAT', 'CREATE', 'CREATE_HIVE', 'CREATE_KUDU',
'DEPENDENT_LOAD', 'LOAD',
'DEPENDENT_LOAD', 'DEPENDENT_LOAD_KUDU', 'LOAD',
'LOAD_LOCAL', 'ALTER', 'HBASE_COLUMN_FAMILIES', 'TABLE_PROPERTIES']
return parse_test_file(file_name, VALID_SECTION_NAMES, skip_unknown_sections=False)

View File

@@ -76,6 +76,34 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypes/100901.txt' OVERW
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypes/101001.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2010, month=10);
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypes/101101.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2010, month=11);
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypes/101201.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2010, month=12);
---- CREATE_KUDU
CREATE TABLE {db_name}{db_suffix}.{table_name} (
id INT,
bool_col BOOLEAN,
tinyint_col TINYINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_string_col STRING,
string_col STRING,
timestamp_col STRING,
year INT,
month INT
)
DISTRIBUTE BY HASH (id) INTO 3 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = '{table_name}',
'kudu.master_addresses' = '127.0.0.1:7051',
'kudu.key_columns' = 'id'
);
---- DEPENDENT_LOAD_KUDU
INSERT into TABLE {db_name}{db_suffix}.{table_name}
SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col,
cast(timestamp_col as string), year, month
FROM {db_name}.{table_name};
====
---- DATASET
functional
@@ -131,6 +159,34 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesSmall/090101.txt'
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesSmall/090201.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2009, month=2);
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesSmall/090301.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2009, month=3);
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesSmall/090401.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2009, month=4);
---- CREATE_KUDU
CREATE TABLE {db_name}{db_suffix}.{table_name} (
id INT,
bool_col BOOLEAN,
tinyint_col TINYINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_string_col STRING,
string_col STRING,
timestamp_col STRING,
year INT,
month INT
)
DISTRIBUTE BY HASH (id) INTO 3 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = '{table_name}',
'kudu.master_addresses' = '127.0.0.1:7051',
'kudu.key_columns' = 'id'
);
---- DEPENDENT_LOAD_KUDU
INSERT into TABLE {db_name}{db_suffix}.{table_name}
SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col,
cast(timestamp_col as string), year, month
FROM {db_name}.{table_name};
====
---- DATASET
functional
@@ -167,6 +223,34 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesTiny/090101.txt' O
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesTiny/090201.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2009, month=2);
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesTiny/090301.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2009, month=3);
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesTiny/090401.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2009, month=4);
---- CREATE_KUDU
CREATE TABLE {db_name}{db_suffix}.{table_name} (
id INT,
bool_col BOOLEAN,
tinyint_col TINYINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_string_col STRING,
string_col STRING,
timestamp_col STRING,
year INT,
month INT
)
DISTRIBUTE BY HASH (id) INTO 3 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = '{table_name}',
'kudu.master_addresses' = '127.0.0.1:7051',
'kudu.key_columns' = 'id'
);
---- DEPENDENT_LOAD_KUDU
INSERT INTO TABLE {db_name}{db_suffix}.{table_name}
SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col,
cast(timestamp_col as string), year, month
FROM {db_name}.{table_name};
====
---- DATASET
functional
@@ -476,6 +560,46 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesAgg/100108.txt' OV
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesAgg/100109.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2010, month=1, day=9);
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesAgg/100110.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2010, month=1, day=10);
INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} partition (year, month, day) SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col, year, month, tinyint_col as day FROM {db_name}.{table_name} WHERE year=2010 and month=1 and day IS NOT NULL and tinyint_col IS NULL order by id;
---- CREATE_KUDU
DROP VIEW IF EXISTS {db_name}{db_suffix}.{table_name};
DROP TABLE IF EXISTS {db_name}{db_suffix}.{table_name}_idx;
CREATE TABLE {db_name}{db_suffix}.{table_name}_idx (
kudu_idx BIGINT,
id INT,
bool_col BOOLEAN,
tinyint_col TINYINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_string_col STRING,
string_col STRING,
timestamp_col STRING,
year INT,
month INT,
day INT
)
DISTRIBUTE BY HASH (kudu_idx) INTO 3 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = '{table_name}',
'kudu.master_addresses' = '127.0.0.1:7051',
'kudu.key_columns' = 'kudu_idx'
);
CREATE VIEW {db_name}{db_suffix}.{table_name} AS
SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
double_col, date_string_col, string_col, timestamp_col, year, month, day
FROM {db_name}{db_suffix}.{table_name}_idx;
---- DEPENDENT_LOAD_KUDU
INSERT into TABLE {db_name}{db_suffix}.{table_name}_idx
SELECT row_number() over (order by year, month, id, day),
id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
double_col, date_string_col, string_col,
cast(timestamp_col as string), year, month, day
FROM {db_name}.{table_name};
====
---- DATASET
functional
@@ -525,6 +649,36 @@ LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesAggNoNulls/100107.
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesAggNoNulls/100108.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2010, month=1, day=8);
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesAggNoNulls/100109.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2010, month=1, day=9);
LOAD DATA LOCAL INPATH '{impala_home}/testdata/target/AllTypesAggNoNulls/100110.txt' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name} PARTITION(year=2010, month=1, day=10);
---- CREATE_KUDU
CREATE TABLE {db_name}{db_suffix}.{table_name} (
id INT,
bool_col BOOLEAN,
tinyint_col TINYINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_string_col STRING,
string_col STRING,
timestamp_col STRING,
year INT,
month INT,
day INT
)
DISTRIBUTE BY HASH (id) INTO 3 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = '{table_name}',
'kudu.master_addresses' = '127.0.0.1:7051',
'kudu.key_columns' = 'id'
);
---- DEPENDENT_LOAD_KUDU
INSERT into TABLE {db_name}{db_suffix}.{table_name}
SELECT id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col,
double_col, date_string_col, string_col,
cast(timestamp_col as string), year, month, day
FROM {db_name}.{table_name};
====
---- DATASET
functional
@@ -632,6 +786,19 @@ name string
zip int
---- ROW_FORMAT
delimited fields terminated by ',' escaped by '\\'
---- CREATE_KUDU
create table {db_name}{db_suffix}.{table_name} (
id bigint,
name string,
zip int
)
distribute by range(id) split rows ((1003), (1007))
tblproperties (
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.master_addresses' = '127.0.0.1:7051',
'kudu.table_name' = '{table_name}',
'kudu.key_columns' = 'id'
);
====
---- DATASET
functional
@@ -676,6 +843,20 @@ delimited fields terminated by ',' escaped by '\\'
INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}.{table_name};
---- LOAD
LOAD DATA LOCAL INPATH '{impala_home}/testdata/JoinTbl/data.csv' OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
---- CREATE_KUDU
create table {db_name}{db_suffix}.{table_name} (
test_id bigint,
test_name string,
test_zip int,
alltypes_id int
)
distribute by range(test_id) split rows ((1003), (1007))
tblproperties (
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.master_addresses' = '127.0.0.1:7051',
'kudu.table_name' = '{table_name}',
'kudu.key_columns' = 'test_id, test_name, test_zip, alltypes_id'
);
====
---- DATASET
functional
@@ -1008,6 +1189,18 @@ emptytable
f2 int
---- COLUMNS
field string
---- CREATE_KUDU
CREATE TABLE {db_name}{db_suffix}.{table_name} (
field STRING,
f2 INT
)
DISTRIBUTE BY HASH (field) INTO 3 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = '{table_name}',
'kudu.master_addresses' = '127.0.0.1:7051',
'kudu.key_columns' = 'field'
);
====
---- DATASET
functional
@@ -1164,6 +1357,17 @@ INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} select * from functiona
---- LOAD
LOAD DATA LOCAL INPATH '{impala_home}/testdata/NullTable/data.csv'
OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
---- CREATE_KUDU
create table {db_name}{db_suffix}.{table_name} (
a string, b string, c string, d int, e double, f string, g string
)
distribute by hash(a) into 3 buckets
tblproperties (
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.master_addresses' = '127.0.0.1:7051',
'kudu.table_name' = '{table_name}',
'kudu.key_columns' = 'a'
);
====
---- DATASET
functional
@@ -1184,6 +1388,17 @@ INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} select * from functiona
---- LOAD
LOAD DATA LOCAL INPATH '{impala_home}/testdata/NullTable/data.csv'
OVERWRITE INTO TABLE {db_name}{db_suffix}.{table_name};
---- CREATE_KUDU
create table {db_name}{db_suffix}.{table_name} (
a string, b string, c string, d int, e double, f string, g string
)
distribute by hash(a) into 3 buckets
tblproperties (
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.master_addresses' = '127.0.0.1:7051',
'kudu.table_name' = '{table_name}',
'kudu.key_columns' = 'a'
);
====
---- DATASET
functional

View File

@@ -169,12 +169,20 @@ table_name:large_multistream_bzip2_tbl, constraint:restrict_to, table_format:tex
# Kudu can't handle certain types such as timestamp so we pick and choose the tables
# we actually use for Kudu related tests.
table_name:alltypes, constraint:only, table_format:kudu/none/none
table_name:alltypessmall, constraint:only, table_format:kudu/none/none
table_name:alltypestiny, constraint:only, table_format:kudu/none/none
table_name:alltypesagg, constraint:only, table_format:kudu/none/none
table_name:alltypesaggnonulls, constraint:only, table_format:kudu/none/none
table_name:testtbl, constraint:only, table_format:kudu/none/none
table_name:jointbl, constraint:only, table_format:kudu/none/none
table_name:emptytable, constraint:only, table_format:kudu/none/none
table_name:dimtbl, constraint:only, table_format:kudu/none/none
table_name:text_comma_backslash_newline, constraint:only, table_format:kudu/none/none
table_name:tinytable, constraint:only, table_format:kudu/none/none
table_name:tinyinttable, constraint:only, table_format:kudu/none/none
table_name:zipcode_incomes, constraint:only, table_format:kudu/none/none
table_name:nulltable, constraint:only, table_format:kudu/none/none
table_name:nullescapedtable, constraint:only, table_format:kudu/none/none
# Skipping header lines is only effective with text tables
table_name:table_with_header, constraint:restrict_to, table_format:text/none/none
1 # Table level constraints:
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188

View File

@@ -42,44 +42,48 @@ where a.id = b.id and a.id in (select id from functional.alltypes)
DELETE FROM KUDU [functional_kudu.testtbl]
| check keys exist: false
|
04:HASH JOIN [LEFT SEMI JOIN]
| hash predicates: a.id = id
| runtime filters: RF000 <- id
04:HASH JOIN [RIGHT SEMI JOIN]
| hash predicates: id = a.id
| runtime filters: RF000 <- a.id
|
|--02:SCAN HDFS [functional.alltypes]
|--03:HASH JOIN [INNER JOIN]
| | hash predicates: b.id = a.id
| | runtime filters: RF001 <- a.id
| |
| |--00:SCAN KUDU [functional_kudu.testtbl a]
| |
| 01:SCAN HDFS [functional.alltypes b]
| partitions=24/24 files=24 size=478.45KB
| runtime filters: RF001 -> b.id
|
03:HASH JOIN [INNER JOIN]
| hash predicates: b.id = a.id
| runtime filters: RF001 <- a.id
|
|--00:SCAN KUDU [functional_kudu.testtbl a]
|
01:SCAN HDFS [functional.alltypes b]
02:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
runtime filters: RF000 -> b.id, RF001 -> b.id
runtime filters: RF000 -> id
---- DISTRIBUTEDPLAN
DELETE FROM KUDU [functional_kudu.testtbl]
| check keys exist: false
|
04:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
| hash predicates: a.id = id
| runtime filters: RF000 <- id
04:HASH JOIN [RIGHT SEMI JOIN, PARTITIONED]
| hash predicates: id = a.id
| runtime filters: RF000 <- a.id
|
|--06:EXCHANGE [BROADCAST]
|--07:EXCHANGE [HASH(a.id)]
| |
| 02:SCAN HDFS [functional.alltypes]
| 03:HASH JOIN [INNER JOIN, BROADCAST]
| | hash predicates: b.id = a.id
| | runtime filters: RF001 <- a.id
| |
| |--05:EXCHANGE [BROADCAST]
| | |
| | 00:SCAN KUDU [functional_kudu.testtbl a]
| |
| 01:SCAN HDFS [functional.alltypes b]
| partitions=24/24 files=24 size=478.45KB
| runtime filters: RF001 -> b.id
|
03:HASH JOIN [INNER JOIN, BROADCAST]
| hash predicates: b.id = a.id
| runtime filters: RF001 <- a.id
06:EXCHANGE [HASH(id)]
|
|--05:EXCHANGE [BROADCAST]
| |
| 00:SCAN KUDU [functional_kudu.testtbl a]
|
01:SCAN HDFS [functional.alltypes b]
02:SCAN HDFS [functional.alltypes]
partitions=24/24 files=24 size=478.45KB
runtime filters: RF000 -> b.id, RF001 -> b.id
runtime filters: RF000 -> id
====

View File

@@ -3,9 +3,9 @@ select * from functional_kudu.testtbl
00:SCAN KUDU [functional_kudu.testtbl]
---- SCANRANGELOCATIONS
NODE 0:
ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
ScanToken{table=testtbl, range-partition: [(int64 id=1007), <end>)}
ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|
@@ -17,9 +17,9 @@ select * from functional_kudu.testtbl where name = '10'
kudu predicates: name = '10'
---- SCANRANGELOCATIONS
NODE 0:
ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
ScanToken{table=testtbl, range-partition: [(int64 id=1007), <end>)}
ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|
@@ -99,9 +99,7 @@ and zip > 1 and zip < 50
kudu predicates: id >= 10, zip <= 5, id <= 20, zip >= 0, zip <= 30, zip > 1, zip < 50, name = 'foo'
---- SCANRANGELOCATIONS
NODE 0:
ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|
@@ -117,9 +115,7 @@ where id < 10 + 30 and cast(sin(id) as boolean) = true and 20 * 3 >= id and 10
kudu predicates: id < 40, id <= 60, id < 103
---- SCANRANGELOCATIONS
NODE 0:
ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|
@@ -136,9 +132,9 @@ where cast(sin(id) as boolean) = true and name = 'a'
kudu predicates: name = 'a'
---- SCANRANGELOCATIONS
NODE 0:
ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
ScanToken{table=testtbl, range-partition: [(int64 id=1007), <end>)}
ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|
@@ -155,9 +151,9 @@ where cast(sin(id) as boolean) = true and name is null
predicates: name IS NULL, CAST(sin(id) AS BOOLEAN) = TRUE
---- SCANRANGELOCATIONS
NODE 0:
ScanToken{table=testtbl, hash-partition-buckets: [0], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [1], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, hash-partition-buckets: [2], range-partition: [<start>, <end>)}
ScanToken{table=testtbl, range-partition: [(int64 id=1003), (int64 id=1007))}
ScanToken{table=testtbl, range-partition: [(int64 id=1007), <end>)}
ScanToken{table=testtbl, range-partition: [<start>, (int64 id=1003))}
---- DISTRIBUTEDPLAN
01:EXCHANGE [UNPARTITIONED]
|

View File

@@ -4,4 +4,5 @@ file_format:seq, dataset:functional, compression_codec:snap, compression_type:bl
file_format:rc, dataset: functional, compression_codec: snap, compression_type: block
file_format:parquet, dataset: functional, compression_codec: none, compression_type: none
file_format:avro, dataset: functional, compression_codec: snap, compression_type: block
file_format:hbase, dataset:functional, compression_codec:none, compression_type:none
file_format:hbase, dataset:functional, compression_codec:none, compression_type:none
file_format:kudu, dataset:functional, compression_codec:none, compression_type:none
1 # Manually created file.
4 file_format:rc, dataset: functional, compression_codec: snap, compression_type: block
5 file_format:parquet, dataset: functional, compression_codec: none, compression_type: none
6 file_format:avro, dataset: functional, compression_codec: snap, compression_type: block
7 file_format:hbase, dataset:functional, compression_codec:none, compression_type:none
8 file_format:kudu, dataset:functional, compression_codec:none, compression_type:none

View File

@@ -23,3 +23,4 @@ file_format: avro, dataset: functional, compression_codec: def, compression_type
file_format: avro, dataset: functional, compression_codec: snap, compression_type: block
file_format: parquet, dataset: functional, compression_codec: none, compression_type: none
file_format: hbase, dataset: functional, compression_codec: none, compression_type: none
file_format: kudu, dataset: functional, compression_codec: none, compression_type: none
1 # Generated File.
23 file_format: avro, dataset: functional, compression_codec: snap, compression_type: block
24 file_format: parquet, dataset: functional, compression_codec: none, compression_type: none
25 file_format: hbase, dataset: functional, compression_codec: none, compression_type: none
26 file_format: kudu, dataset: functional, compression_codec: none, compression_type: none

View File

@@ -5,3 +5,4 @@ file_format: rc, dataset: functional, compression_codec: gzip, compression_type:
file_format: avro, dataset: functional, compression_codec: snap, compression_type: block
file_format: parquet, dataset: functional, compression_codec: none, compression_type: none
file_format: hbase, dataset: functional, compression_codec: none, compression_type: none
file_format: kudu, dataset: functional, compression_codec: none, compression_type: none
1 # Generated File.
5 file_format: avro, dataset: functional, compression_codec: snap, compression_type: block
6 file_format: parquet, dataset: functional, compression_codec: none, compression_type: none
7 file_format: hbase, dataset: functional, compression_codec: none, compression_type: none
8 file_format: kudu, dataset: functional, compression_codec: none, compression_type: none

View File

@@ -833,8 +833,7 @@ bigint, boolean, boolean, bigint, double
====
---- QUERY
# Test ignored distinct in MIN and MAX with NULLs
select min(distinct NULL), max(distinct NULL) from alltypesagg
where day is not null
select min(distinct NULL), max(distinct NULL) from alltypes
---- RESULTS
NULL,NULL
---- TYPES

View File

@@ -50,6 +50,10 @@ class SkipIfS3:
qualified_path = pytest.mark.skipif(IS_S3,
reason="Tests rely on HDFS qualified paths, IMPALA-1872")
class SkipIfKudu:
unsupported_env = pytest.mark.skipif(os.environ["KUDU_IS_SUPPORTED"] == "false",
reason="Kudu is not supported in this environment")
class SkipIf:
skip_hbase = pytest.mark.skipif(pytest.config.option.skip_hbase,
reason="--skip_hbase argument specified")

View File

@@ -321,15 +321,18 @@ def verify_raw_results(test_section, exec_result, file_format, update_section=Fa
if test_section.get('TYPES'):
expected_types = [c.strip().upper() for c in test_section['TYPES'].rstrip('\n').split(',')]
# Avro and Kudu represent TIMESTAMP columns as strings, so tests using TIMESTAMP are
# skipped because results will be wrong.
if file_format in ('avro', 'kudu') and 'TIMESTAMP' in expected_types:
LOG.info("TIMESTAMP columns unsupported in %s, skipping verification." %\
file_format)
return
# Avro does not support as many types as Hive, so the Avro test tables may
# have different column types than we expect (e.g., INT instead of
# TINYINT). We represent TIMESTAMP columns as strings in Avro, so we bail in
# this case since the results will be wrong. Otherwise we bypass the type
# checking by ignoring the actual types of the Avro table.
# TINYINT). Bypass the type checking by ignoring the actual types of the Avro
# table.
if file_format == 'avro':
if 'TIMESTAMP' in expected_types:
LOG.info("TIMESTAMP columns unsupported in Avro, skipping verification.")
return
LOG.info("Skipping type verification of Avro-format table.")
actual_types = expected_types
else:

View File

@@ -129,6 +129,8 @@ class TestAggregationQueries(ImpalaTestSuite):
if vector.get_value('table_format').file_format == 'hbase':
pytest.xfail("HBase returns columns in alphabetical order for select distinct *, "
"making the result verication to fail.")
if vector.get_value('table_format').file_format == 'kudu':
pytest.xfail("IMPALA-4042: count(distinct NULL) fails on a view, needed for kudu")
self.run_test_case('QueryTest/distinct', vector)
def test_group_concat(self, vector):

View File

@@ -47,6 +47,9 @@ class TestExprs(ImpalaTestSuite):
pytest.skip()
if table_format.file_format == 'hbase':
pytest.xfail("A lot of queries check for NULLs, which hbase does not recognize")
if table_format.file_format == 'kudu':
# Can't load LikeTbl without KUDU-1570.
pytest.xfail("Need support for Kudu tables with nullable PKs (KUDU-1570)")
self.run_test_case('QueryTest/exprs', vector)
# This will change the current database to matching table format and then execute

View File

@@ -49,26 +49,22 @@ class TestQueries(ImpalaTestSuite):
def get_workload(cls):
return 'functional-query'
def test_hdfs_scan_node(self, vector):
self.run_test_case('QueryTest/hdfs-scan-node', vector)
def test_analytic_fns(self, vector):
# TODO: Enable some of these tests for Avro if possible
# Don't attempt to evaluate timestamp expressions with Avro tables which doesn't
# TODO: Enable some of these tests for Avro/Kudu if possible
# Don't attempt to evaluate timestamp expressions with Avro/Kudu tables which don't
# support a timestamp type yet
table_format = vector.get_value('table_format')
if table_format.file_format == 'avro':
pytest.skip()
if table_format.file_format in ['avro', 'kudu']:
pytest.xfail("%s doesn't support TIMESTAMP" % (table_format.file_format))
if table_format.file_format == 'hbase':
pytest.xfail("A lot of queries check for NULLs, which hbase does not recognize")
self.run_test_case('QueryTest/analytic-fns', vector)
def test_file_partitions(self, vector):
self.run_test_case('QueryTest/hdfs-partitions', vector)
def test_limit(self, vector):
if vector.get_value('table_format').file_format == 'hbase':
pytest.xfail("IMPALA-283 - select count(*) produces inconsistent results")
if vector.get_value('table_format').file_format == 'kudu':
pytest.xfail("Limit queries without order by clauses are non-deterministic")
self.run_test_case('QueryTest/limit', vector)
def test_top_n(self, vector):
@@ -121,9 +117,9 @@ class TestQueries(ImpalaTestSuite):
def test_misc(self, vector):
table_format = vector.get_value('table_format')
if table_format.file_format in ['hbase', 'rc', 'parquet']:
if table_format.file_format in ['hbase', 'rc', 'parquet', 'kudu']:
msg = ("Failing on rc/snap/block despite resolution of IMP-624,IMP-503. "
"Failing on parquet because tables do not exist")
"Failing on kudu and parquet because tables do not exist")
pytest.xfail(msg)
self.run_test_case('QueryTest/misc', vector)
@@ -196,3 +192,20 @@ class TestQueriesParquetTables(ImpalaTestSuite):
vector.get_value('exec_option')['disable_outermost_topn'] = 1
vector.get_value('exec_option')['num_nodes'] = 1
self.run_test_case('QueryTest/single-node-large-sorts', vector)
# Tests for queries in HDFS-specific tables, e.g. AllTypesAggMultiFilesNoPart.
# This is a subclass of TestQueries to get the extra test dimension for
# exec_single_node_rows_threshold in exhaustive.
class TestHdfsQueries(TestQueries):
@classmethod
def add_test_dimensions(cls):
super(TestHdfsQueries, cls).add_test_dimensions()
# Kudu doesn't support AllTypesAggMultiFilesNoPart (KUDU-1271, KUDU-1570).
cls.TestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format != 'kudu')
def test_hdfs_scan_node(self, vector):
self.run_test_case('QueryTest/hdfs-scan-node', vector)
def test_file_partitions(self, vector):
self.run_test_case('QueryTest/hdfs-partitions', vector)

View File

@@ -31,9 +31,9 @@ class TestRuntimeFilters(ImpalaTestSuite):
@classmethod
def add_test_dimensions(cls):
super(TestRuntimeFilters, cls).add_test_dimensions()
# Runtime filters are disabled on HBase
# Runtime filters are disabled on HBase, Kudu
cls.TestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format != 'hbase')
lambda v: v.get_value('table_format').file_format not in ['hbase', 'kudu'])
def test_basic_filters(self, vector):
self.run_test_case('QueryTest/runtime_filters', vector)

View File

@@ -137,6 +137,9 @@ class TestUnmatchedSchema(ImpalaTestSuite):
"drop table if exists jointbl_test", vector)
def test_unmatched_schema(self, vector):
if vector.get_value('table_format').file_format == 'kudu':
pytest.xfail("IMPALA-2890: Missing Kudu DDL support")
table_format = vector.get_value('table_format')
# jointbl has no columns with unique values. When loaded in hbase, the table looks
# different, as hbase collapses duplicates.
@@ -161,6 +164,9 @@ class TestWideRow(ImpalaTestSuite):
lambda v: v.get_value('table_format').file_format != 'hbase')
def test_wide_row(self, vector):
if vector.get_value('table_format').file_format == 'kudu':
pytest.xfail("KUDU-666: Kudu support for large values")
new_vector = deepcopy(vector)
# Use a 5MB scan range, so we will have to perform 5MB of sync reads
new_vector.get_value('exec_option')['max_scan_range_length'] = 5 * 1024 * 1024
@@ -190,6 +196,9 @@ class TestWideTable(ImpalaTestSuite):
cls.TestMatrix.add_constraint(lambda v: False)
def test_wide_table(self, vector):
if vector.get_value('table_format').file_format == 'kudu':
pytest.xfail("IMPALA-3718: Extend Kudu functional test support")
NUM_COLS = vector.get_value('num_cols')
# Due to the way HBase handles duplicate row keys, we have different number of
# rows in HBase tables compared to HDFS tables.

View File

@@ -33,7 +33,7 @@ class TestTpcdsQuery(ImpalaTestSuite):
def add_test_dimensions(cls):
super(TestTpcdsQuery, cls).add_test_dimensions()
cls.TestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format not in ['rc', 'hbase'] and\
v.get_value('table_format').file_format not in ['rc', 'hbase', 'kudu'] and\
v.get_value('table_format').compression_codec in ['none', 'snap'] and\
v.get_value('table_format').compression_type != 'record')