mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
Initial implementation of KUDU-1261 (array column type) recently merged in upstream Apache Kudu repository. This patch add initial Impala support for working with Kudu tables having array type columns. Unlike rows, the elements of a Kudu array are stored in a different format than Impala. Instead of per-row bit flag for NULL info, values and NULL bits are stored in separate arrays. The following types of queries are not supported in this patch: - (IMPALA-14538) Queries that reference an array column as a table, e.g. ```sql SELECT item FROM kudu_array.array_int; ``` - (IMPALA-14539) Queries that create duplicate collection slots, e.g. ```sql SELECT array_int FROM kudu_array AS t, t.array_int AS unnested; ``` Testing: - Add some FE tests in AnalyzeDDLTest and AnalyzeKuduDDLTest. - Add EE test test_kudu.py::TestKuduArray. Since Impala does not support inserting complex types, including array, the data insertion part of the test is achieved through custom C++ code kudu-array-inserter.cc that insert into Kudu via Kudu C++ client. It would be great if we could migrate it to Python so that it can be moved to the same file as the test (IMPALA-14537). - Pass core tests. Co-authored-by: Riza Suminto Change-Id: I9282aac821bd30668189f84b2ed8fff7047e7310 Reviewed-on: http://gerrit.cloudera.org:8080/23493 Reviewed-by: Alexey Serbin <alexey@apache.org> Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
2096 lines
88 KiB
Python
2096 lines
88 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import range
|
|
from copy import deepcopy
|
|
|
|
from base64 import b64decode
|
|
from kudu.schema import (
|
|
BOOL,
|
|
DOUBLE,
|
|
FLOAT,
|
|
INT16,
|
|
INT32,
|
|
INT64,
|
|
INT8,
|
|
SchemaBuilder,
|
|
STRING,
|
|
BINARY,
|
|
UNIXTIME_MICROS,
|
|
DATE)
|
|
from kudu.client import Partitioning
|
|
from kudu.util import to_unixtime_micros
|
|
import json
|
|
import logging
|
|
import pytest
|
|
import os
|
|
import random
|
|
import re
|
|
import subprocess
|
|
import textwrap
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from pytz import utc
|
|
|
|
from tests.common.environ import ImpalaTestClusterProperties, HIVE_MAJOR_VERSION
|
|
from tests.common.kudu_test_suite import KuduTestSuite
|
|
from tests.common.impala_cluster import ImpalaCluster
|
|
from tests.common.skip import SkipIfNotHdfsMinicluster, SkipIfKudu, SkipIfHive2
|
|
from tests.common.test_dimensions import (
|
|
HS2,
|
|
add_exec_option_dimension,
|
|
add_mandatory_exec_option,
|
|
create_single_exec_option_dimension)
|
|
from tests.verifiers.metric_verifier import MetricVerifier
|
|
|
|
KUDU_MASTER_HOSTS = pytest.config.option.kudu_master_hosts
|
|
IMPALA_TEST_CLUSTER_PROPERTIES = ImpalaTestClusterProperties.get_instance()
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def create_external_kudu_query(db, tbl, kudu_tbl_name, other_props=None):
|
|
props = {'kudu.table_name': kudu_tbl_name}
|
|
if other_props is not None:
|
|
props.update(other_props)
|
|
props_str = ', '.join(["'{}'='{}'".format(k, v) for k, v in props.items()])
|
|
return (
|
|
"""
|
|
CREATE EXTERNAL TABLE {}.{}
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ({})""").format(db, tbl, props_str)
|
|
|
|
|
|
def set_latest_observed_option(vector, kudu_client):
|
|
vector.set_exec_option(
|
|
'kudu_snapshot_read_timestamp_micros',
|
|
to_unixtime_micros(kudu_client.latest_observed_timestamp()))
|
|
|
|
|
|
# TODO(IMPALA-8614): parameterize some tests to run with HMS integration enabled.
|
|
class TestKuduBasicDML(KuduTestSuite):
|
|
"""
|
|
This suite tests the basic DML operations when using a kudu table.
|
|
"""
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestKuduBasicDML, cls).add_test_dimensions()
|
|
# The default read mode of READ_LATEST does not provide high enough consistency for
|
|
# these tests.
|
|
add_mandatory_exec_option(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
|
|
# Run with and without multithreading to ensure Kudu DML works with both threading
|
|
# models. E.g. see IMPALA-9782.
|
|
add_exec_option_dimension(cls, "mt_dop", [0, 4])
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_insert(self, vector, unique_database):
|
|
# Remove 'abort_on_error' option so we can set it at .test file.
|
|
# Revisit this if 'abort_on_error' dimension size increase.
|
|
vector.unset_exec_option('abort_on_error')
|
|
self.run_test_case('QueryTest/kudu_insert', vector, use_db=unique_database)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_update(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/kudu_update', vector, use_db=unique_database)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_upsert(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/kudu_upsert', vector, use_db=unique_database)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_delete(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/kudu_delete', vector, use_db=unique_database)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_create_table_like_table(self, vector, unique_database):
|
|
self.run_test_case(
|
|
'QueryTest/kudu_create_table_like_table',
|
|
vector,
|
|
use_db=unique_database)
|
|
|
|
|
|
class TestKuduTimestampConvert(KuduTestSuite):
|
|
"""
|
|
This suite tests converts UTC timestamps read from kudu table to local time.
|
|
"""
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestKuduTimestampConvert, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_mandatory_exec_option('convert_kudu_utc_timestamps', 'true')
|
|
cls.ImpalaTestMatrix.add_mandatory_exec_option('write_kudu_utc_timestamps', 'true')
|
|
cls.ImpalaTestMatrix.add_mandatory_exec_option(
|
|
'use_local_tz_for_unix_timestamp_conversions', 'false')
|
|
cls.ImpalaTestMatrix.add_mandatory_exec_option('timezone', '"America/Los_Angeles"')
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_timestamp_conversion(self, vector, unique_database):
|
|
self.run_test_case(
|
|
'QueryTest/kudu_timestamp_conversion', vector, use_db=unique_database)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_predicate_with_timestamp_conversion(self, vector):
|
|
self.run_test_case('QueryTest/kudu_predicate_with_timestamp_conversion', vector)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_runtime_filter_with_timestamp_conversion(self, vector):
|
|
new_vector = deepcopy(vector)
|
|
del new_vector.get_value('exec_option')['timezone'] # .test file sets timezone
|
|
self.run_test_case('QueryTest/kudu_runtime_filter_with_timestamp_conversion',
|
|
new_vector)
|
|
|
|
|
|
# TODO(IMPALA-8614): parameterize some tests to run with HMS integration enabled.
|
|
class TestKuduOperations(KuduTestSuite):
|
|
"""
|
|
This suite tests the different modification operations when using a kudu table.
|
|
"""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestKuduOperations, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
|
|
# The default read mode of READ_LATEST does not provide high enough consistency for
|
|
# these tests.
|
|
add_mandatory_exec_option(cls, "kudu_read_mode", "READ_AT_SNAPSHOT")
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_out_of_range_timestamps(self, vector, kudu_client, unique_database):
|
|
"""Test timestamp values that are outside of Impala's supported date range."""
|
|
tbl = 'times'
|
|
self.execute_query_using_vector(
|
|
("CREATE TABLE {}.{} (a INT PRIMARY KEY, ts TIMESTAMP)"
|
|
"PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU").format(unique_database, tbl),
|
|
vector)
|
|
assert kudu_client.table_exists(
|
|
KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
|
|
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
session = kudu_client.new_session()
|
|
session.apply(table.new_insert((0, datetime(1987, 5, 19, 0, 0, tzinfo=utc))))
|
|
# Add a date before 1400
|
|
session.apply(table.new_insert((1, datetime(1300, 1, 1, 0, 0, tzinfo=utc))))
|
|
# TODO: Add a date after 9999. There isn't a way to represent a date greater than
|
|
# 9999 in Python datetime.
|
|
# session.apply(table.new_insert((2, datetime(12000, 1, 1, 0, 0, tzinfo=utc))))
|
|
session.flush()
|
|
|
|
set_latest_observed_option(vector, kudu_client)
|
|
# TODO: The test driver should have a way to specify query options in an 'options'
|
|
# section rather than having to split abort_on_error cases into separate files.
|
|
vector.set_exec_option('abort_on_error', 0)
|
|
self.run_test_case('QueryTest/kudu-overflow-ts', vector,
|
|
use_db=unique_database)
|
|
|
|
vector.set_exec_option('abort_on_error', 1)
|
|
self.run_test_case('QueryTest/kudu-overflow-ts-abort-on-error', vector,
|
|
use_db=unique_database)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_scan_node(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database)
|
|
|
|
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_insert_mem_limit(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/kudu_insert_mem_limit', vector, use_db=unique_database)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_partition_ddl(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/kudu_partition_ddl', vector, use_db=unique_database)
|
|
|
|
@pytest.mark.skipif(IMPALA_TEST_CLUSTER_PROPERTIES.is_remote_cluster(),
|
|
reason="Test references hardcoded hostnames: IMPALA-4873")
|
|
@pytest.mark.execute_serially
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_kudu_alter_table(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/kudu_alter', vector, use_db=unique_database)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_stats(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/kudu_stats', vector, use_db=unique_database)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_describe(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/kudu_describe', vector, use_db=unique_database)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_limit(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/kudu_limit', vector, use_db=unique_database)
|
|
|
|
def test_kudu_column_options(self, vector, kudu_client, unique_database):
|
|
"""Test Kudu column options"""
|
|
encodings = ["ENCODING PLAIN_ENCODING", ""]
|
|
compressions = ["COMPRESSION SNAPPY", ""]
|
|
nullability = ["NOT NULL", "NULL", ""]
|
|
defaults = ["DEFAULT 1", ""]
|
|
blocksizes = ["BLOCK_SIZE 32768", ""]
|
|
indx = 1
|
|
for encoding in encodings:
|
|
for compression in compressions:
|
|
for default in defaults:
|
|
for blocksize in blocksizes:
|
|
for nullable in nullability:
|
|
impala_tbl_name = "test_column_options_%s" % str(indx)
|
|
self.execute_query_using_vector(
|
|
"""
|
|
CREATE TABLE {}.{} (
|
|
a INT PRIMARY KEY {} {} {} {},
|
|
b INT {} {} {} {} {}
|
|
) PARTITION BY HASH (a) PARTITIONS 3 STORED AS KUDU""".format(
|
|
unique_database, impala_tbl_name,
|
|
encoding, compression, default, blocksize,
|
|
nullable, encoding, compression, default, blocksize),
|
|
vector)
|
|
indx = indx + 1
|
|
assert kudu_client.table_exists(
|
|
KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name))
|
|
|
|
def test_kudu_col_changed(self, vector, kudu_client, unique_database,
|
|
cluster_properties):
|
|
"""Test changing a Kudu column outside of Impala results in a failure on read with
|
|
outdated metadata (IMPALA-4828)."""
|
|
tbl = 'foo'
|
|
db_tbl = unique_database + '.' + tbl
|
|
self.execute_query_using_vector(
|
|
"""
|
|
CREATE TABLE {}.{} (a INT PRIMARY KEY, s STRING)
|
|
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""".format(unique_database, tbl),
|
|
vector)
|
|
assert kudu_client.table_exists(
|
|
KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
|
|
# Force metadata to be loaded on impalads
|
|
self.execute_query_using_vector("select * from %s" % db_tbl, vector)
|
|
|
|
# Load the table via the Kudu client and change col 's' to be a different type.
|
|
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
alterer = kudu_client.new_table_alterer(table)
|
|
alterer.drop_column("s")
|
|
table = alterer.alter()
|
|
alterer = kudu_client.new_table_alterer(table)
|
|
alterer.add_column("s", "int32")
|
|
table = alterer.alter()
|
|
|
|
# Add some rows
|
|
session = kudu_client.new_session()
|
|
for i in range(100):
|
|
op = table.new_insert((i, i))
|
|
session.apply(op)
|
|
session.flush()
|
|
|
|
set_latest_observed_option(vector, kudu_client)
|
|
# Scanning should result in an error with Catalog V1, since the metadata is cached.
|
|
try:
|
|
self.execute_query_using_vector("SELECT * FROM %s" % db_tbl, vector)
|
|
assert cluster_properties.is_catalog_v2_cluster(), \
|
|
"Should fail with Catalog V1, which caches metadata"
|
|
except Exception as e:
|
|
assert not cluster_properties.is_catalog_v2_cluster(), \
|
|
"Should succeed with Catalog V2, which does not cache metadata"
|
|
expected_error = "Column 's' is type INT but Impala expected STRING. The table "\
|
|
"metadata in Impala may be outdated and need to be refreshed."
|
|
assert expected_error in str(e)
|
|
|
|
# After a REFRESH the scan should succeed
|
|
self.execute_query_using_vector("REFRESH %s" % db_tbl, vector)
|
|
result = self.execute_query_using_vector("SELECT * FROM %s" % db_tbl, vector)
|
|
assert len(result.data) == 100
|
|
|
|
def test_kudu_col_not_null_changed(self, vector, kudu_client, unique_database,
|
|
cluster_properties):
|
|
"""Test changing a NOT NULL Kudu column outside of Impala results in a failure
|
|
on read with outdated metadata (IMPALA-4828)."""
|
|
tbl = 'foo'
|
|
db_tbl = unique_database + '.' + tbl
|
|
self.execute_query_using_vector(
|
|
"""CREATE TABLE %s (a INT PRIMARY KEY, s STRING NOT NULL)
|
|
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % db_tbl,
|
|
vector)
|
|
assert kudu_client.table_exists(
|
|
KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
|
|
# Force metadata to be loaded on impalads
|
|
self.execute_query_using_vector("select * from %s" % db_tbl, vector)
|
|
|
|
# Load the table via the Kudu client and change col 's' to be a different type.
|
|
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
alterer = kudu_client.new_table_alterer(table)
|
|
alterer.drop_column("s")
|
|
table = alterer.alter()
|
|
alterer = kudu_client.new_table_alterer(table)
|
|
alterer.add_column("s", "string", nullable=True)
|
|
table = alterer.alter()
|
|
|
|
# Add some rows
|
|
session = kudu_client.new_session()
|
|
for i in range(100):
|
|
op = table.new_insert((i, None))
|
|
session.apply(op)
|
|
session.flush()
|
|
|
|
set_latest_observed_option(vector, kudu_client)
|
|
# Scanning should result in an error
|
|
try:
|
|
self.execute_query_using_vector("SELECT * FROM %s" % db_tbl, vector)
|
|
assert cluster_properties.is_catalog_v2_cluster(), \
|
|
"Should fail with Catalog V1, which caches metadata"
|
|
except Exception as e:
|
|
assert not cluster_properties.is_catalog_v2_cluster(), \
|
|
"Should succeed with Catalog V2, which does not cache metadata"
|
|
expected_error = "Column 's' is nullable but Impala expected it to be "\
|
|
"not nullable. The table metadata in Impala may be outdated and need to be "\
|
|
"refreshed."
|
|
assert expected_error in str(e)
|
|
|
|
# After a REFRESH the scan should succeed
|
|
self.execute_query_using_vector("REFRESH %s" % db_tbl, vector)
|
|
result = self.execute_query_using_vector("SELECT * FROM %s" % db_tbl, vector)
|
|
assert len(result.data) == 100
|
|
|
|
def test_kudu_col_null_changed(self, vector, kudu_client, unique_database,
|
|
cluster_properties):
|
|
"""Test changing a NULL Kudu column outside of Impala results in a failure
|
|
on read with outdated metadata (IMPALA-4828)."""
|
|
tbl = 'foo'
|
|
db_tbl = unique_database + '.' + tbl
|
|
self.execute_query_using_vector(
|
|
"""CREATE TABLE %s (a INT PRIMARY KEY, s STRING NULL)
|
|
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % db_tbl, vector)
|
|
assert kudu_client.table_exists(
|
|
KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
|
|
# Force metadata to be loaded on impalads
|
|
self.execute_query_using_vector("select * from %s" % db_tbl, vector)
|
|
|
|
# Load the table via the Kudu client and change col 's' to be a different type.
|
|
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
alterer = kudu_client.new_table_alterer(table)
|
|
alterer.drop_column("s")
|
|
table = alterer.alter()
|
|
alterer = kudu_client.new_table_alterer(table)
|
|
alterer.add_column("s", "string", nullable=False, default="bar")
|
|
table = alterer.alter()
|
|
|
|
# Add some rows
|
|
session = kudu_client.new_session()
|
|
for i in range(100):
|
|
op = table.new_insert((i, "bar"))
|
|
session.apply(op)
|
|
session.flush()
|
|
|
|
set_latest_observed_option(vector, kudu_client)
|
|
# Scanning should result in an error
|
|
try:
|
|
self.execute_query_using_vector("SELECT * FROM %s" % db_tbl, vector)
|
|
assert cluster_properties.is_catalog_v2_cluster(), \
|
|
"Should fail with Catalog V1, which caches metadata"
|
|
except Exception as e:
|
|
assert not cluster_properties.is_catalog_v2_cluster(), \
|
|
"Should succeed with Catalog V2, which does not cache metadata"
|
|
expected_error = "Column 's' is not nullable but Impala expected it to be "\
|
|
"nullable. The table metadata in Impala may be outdated and need to be "\
|
|
"refreshed."
|
|
assert expected_error in str(e)
|
|
|
|
# After a REFRESH the scan should succeed
|
|
self.execute_query_using_vector("REFRESH %s" % db_tbl, vector)
|
|
result = self.execute_query_using_vector("SELECT * FROM %s" % db_tbl, vector)
|
|
assert len(result.data) == 100
|
|
|
|
def test_kudu_col_added(self, vector, kudu_client, unique_database,
|
|
cluster_properties):
|
|
"""Test adding a Kudu column outside of Impala."""
|
|
tbl = 'foo'
|
|
db_tbl = unique_database + '.' + tbl
|
|
self.execute_query_using_vector("""CREATE TABLE %s (a INT PRIMARY KEY)
|
|
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % db_tbl, vector)
|
|
assert kudu_client.table_exists(
|
|
KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
|
|
# Force metadata to be loaded on impalads
|
|
self.execute_query_using_vector("select * from %s" % db_tbl, vector)
|
|
|
|
# Load the table via the Kudu client and add a new col
|
|
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
alterer = kudu_client.new_table_alterer(table)
|
|
alterer.add_column("b", "int32")
|
|
table = alterer.alter()
|
|
|
|
# Add some rows
|
|
session = kudu_client.new_session()
|
|
op = table.new_insert((0, 0))
|
|
session.apply(op)
|
|
session.flush()
|
|
|
|
set_latest_observed_option(vector, kudu_client)
|
|
result = self.execute_query_using_vector("SELECT * FROM %s" % db_tbl, vector)
|
|
if cluster_properties.is_catalog_v2_cluster():
|
|
# Changes in Kudu should be immediately visible to Impala with Catalog V2.
|
|
assert result.data[0].split('\t') == ['0', '0']
|
|
else:
|
|
# Only the first col is visible to Impala. Impala will not know about the missing
|
|
# column, so '*' is expanded to known columns. This doesn't have a separate check
|
|
# because the query can proceed and checking would need to fetch metadata from the
|
|
# Kudu master, which is what REFRESH is for.
|
|
assert result.data[0].split('\t') == ['0']
|
|
|
|
# After a REFRESH both cols should be visible
|
|
self.execute_query_using_vector("REFRESH %s" % db_tbl, vector)
|
|
result = self.execute_query_using_vector("SELECT * FROM %s" % db_tbl, vector)
|
|
assert result.data[0].split('\t') == ['0', '0']
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_kudu_col_removed(self, vector, kudu_client, unique_database):
|
|
"""Test removing a Kudu column outside of Impala."""
|
|
tbl = 'foo'
|
|
db_tbl = unique_database + '.' + tbl
|
|
self.execute_query_using_vector("""CREATE TABLE %s (a INT PRIMARY KEY, s STRING)
|
|
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % db_tbl, vector)
|
|
assert kudu_client.table_exists(
|
|
KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
|
|
# Force metadata to be loaded on impalads
|
|
self.execute_query_using_vector("select * from %s" % db_tbl, vector)
|
|
self.execute_query_using_vector("insert into %s values (0, 'foo')" % db_tbl, vector)
|
|
|
|
# Load the table via the Kudu client and change col 's' to be a different type.
|
|
table = kudu_client.table(KuduTestSuite.to_kudu_table_name(unique_database, tbl))
|
|
alterer = kudu_client.new_table_alterer(table)
|
|
alterer.drop_column("s")
|
|
table = alterer.alter()
|
|
|
|
# Scanning should result in an error
|
|
try:
|
|
self.execute_query_using_vector("SELECT * FROM %s" % db_tbl, vector)
|
|
except Exception as e:
|
|
expected_error = "Column 's' not found in kudu table impala::test_kudu_col_removed"
|
|
assert expected_error in str(e)
|
|
|
|
# After a REFRESH the scan should succeed
|
|
self.execute_query_using_vector("REFRESH %s" % db_tbl, vector)
|
|
result = self.execute_query_using_vector("SELECT * FROM %s" % db_tbl, vector)
|
|
assert result.data == ['0']
|
|
|
|
def test_kudu_show_unbounded_range_partition(self, vector, kudu_client,
|
|
unique_database):
|
|
"""Check that a single unbounded range partition gets printed correctly."""
|
|
schema_builder = SchemaBuilder()
|
|
column_spec = schema_builder.add_column("id", INT64)
|
|
column_spec.nullable(False)
|
|
schema_builder.set_primary_keys(["id"])
|
|
schema = schema_builder.build()
|
|
tbl = 'unbounded_range_table'
|
|
name = unique_database + "." + tbl
|
|
|
|
try:
|
|
kudu_client.create_table(
|
|
name, schema, partitioning=Partitioning().set_range_partition_columns(["id"]))
|
|
kudu_table = kudu_client.table(name)
|
|
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(unique_database, tbl, kudu_table.name), vector)
|
|
result = self.execute_query("SHOW RANGE PARTITIONS %s" % name)
|
|
assert result.column_labels == ['RANGE (ID)']
|
|
assert result.column_types == ['STRING']
|
|
assert result.data == ['UNBOUNDED']
|
|
|
|
finally:
|
|
if kudu_client.table_exists(name):
|
|
kudu_client.delete_table(name)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_column_storage_attributes(self, vector, unique_database):
|
|
"""Tests that for every valid combination of column type, encoding, and compression,
|
|
we can insert a value and scan it back from Kudu."""
|
|
# This test takes about 2min and is unlikely to break, so only run it in exhaustive.
|
|
if self.exploration_strategy() != 'exhaustive':
|
|
pytest.skip("Only runs in exhaustive to reduce core time.")
|
|
table_name = "%s.storage_attrs" % unique_database
|
|
types = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'float', 'double',
|
|
'string', 'timestamp', 'decimal', 'date', 'varchar(10)']
|
|
create_query = "create table %s (id int primary key" % table_name
|
|
for t in types:
|
|
# We truncate the type attributes in the column name to keep things simple.
|
|
create_query += ", %s_col %s" % (t.split('(')[0], t)
|
|
create_query += ") partition by hash(id) partitions 16 stored as kudu"
|
|
self.execute_query_using_vector(create_query, vector)
|
|
|
|
encodings = ['AUTO_ENCODING', 'PLAIN_ENCODING', 'PREFIX_ENCODING', 'GROUP_VARINT',
|
|
'RLE', 'DICT_ENCODING', 'BIT_SHUFFLE']
|
|
compressions = ['DEFAULT_COMPRESSION', 'NO_COMPRESSION', 'SNAPPY', 'LZ4', 'ZLIB']
|
|
i = 0
|
|
for e in encodings:
|
|
for c in compressions:
|
|
for t in types:
|
|
try:
|
|
# We truncate the type attributes in the column name to keep things simple.
|
|
self.execute_query_using_vector(
|
|
"""
|
|
alter table {} alter column {}_col
|
|
set encoding {} compression {}""".format(
|
|
table_name, t.split('(')[0], e, c),
|
|
vector)
|
|
except Exception as err:
|
|
assert "encoding %s not supported for type" % e in str(err)
|
|
self.execute_query_using_vector(
|
|
"""insert into %s values (%s, true, 0, 0, 0, 0, 0, 0, '0',
|
|
cast('2009-01-01' as timestamp), cast(0 as decimal),
|
|
cast('2010-01-01' as date), cast('' as varchar(10)))""" % (table_name, i),
|
|
vector)
|
|
result = self.execute_query_using_vector(
|
|
"select * from %s where id = %s" % (table_name, i), vector)
|
|
# Beeswax return 'true' while HS2 return 'True'. Lowercase the result.
|
|
assert result.data[0].lower().split('\t') == [
|
|
str(i), 'true', '0', '0', '0', '0', '0', '0', '0',
|
|
'2009-01-01 00:00:00', '0', '2010-01-01', '']
|
|
i += 1
|
|
result = self.execute_query_using_vector(
|
|
"select count(*) from %s" % table_name, vector)
|
|
print(result.data == [str(i)])
|
|
|
|
def test_concurrent_schema_change(self, vector, unique_database):
|
|
"""Tests that an insert into a Kudu table with a concurrent schema change either
|
|
succeeds or fails gracefully."""
|
|
table_name = "%s.test_schema_change" % unique_database
|
|
self.execute_query_using_vector(
|
|
"""create table %s (col0 bigint primary key, col1 bigint)
|
|
partition by hash(col0) partitions 16 stored as kudu""" % table_name, vector)
|
|
|
|
iters = 5
|
|
|
|
def insert_values():
|
|
threading.current_thread().errors = []
|
|
client = self.create_impala_client_from_vector(vector)
|
|
for i in range(0, iters):
|
|
time.sleep(random.random()) # sleeps for up to one second
|
|
try:
|
|
client.execute("insert into %s values (0, 0), (1, 1)" % table_name)
|
|
except Exception as e:
|
|
threading.current_thread().errors.append(e)
|
|
|
|
insert_thread = threading.Thread(target=insert_values)
|
|
insert_thread.start()
|
|
|
|
for i in range(0, iters):
|
|
time.sleep(random.random()) # sleeps for up to one second
|
|
self.execute_query_using_vector(
|
|
"alter table %s drop column col1" % table_name, vector)
|
|
if i % 2 == 0:
|
|
self.execute_query_using_vector(
|
|
"alter table %s add columns (col1 string)" % table_name, vector)
|
|
else:
|
|
self.execute_query_using_vector(
|
|
"alter table %s add columns (col1 bigint)" % table_name, vector)
|
|
|
|
insert_thread.join()
|
|
|
|
for error in insert_thread.errors:
|
|
msg = str(error)
|
|
# The first two are AnalysisExceptions, the next two come from KuduTableSink::Open()
|
|
# if the schema has changed since analysis, the rest come from the Kudu server if
|
|
# the schema changes between KuduTableSink::Open() and when the write ops are sent.
|
|
possible_errors = [
|
|
"has fewer columns (1) than the SELECT / VALUES clause returns (2)",
|
|
"(type: TINYINT) is not compatible with column 'col1' (type: STRING)",
|
|
"has fewer columns than expected.",
|
|
"Column col1 has unexpected type.",
|
|
"Client provided column col1[int64 NULLABLE] not present in tablet",
|
|
"Client provided column col1 INT64 NULLABLE not present in tablet",
|
|
"The column 'col1' must have type string NULLABLE found int64 NULLABLE"
|
|
]
|
|
assert any(err in msg for err in possible_errors)
|
|
|
|
def _retry_query(self, query, vector, expected):
|
|
retries = 0
|
|
while retries < 3:
|
|
result = self.execute_query_using_vector(query, vector)
|
|
if result.data == expected:
|
|
break
|
|
retries += 1
|
|
time.sleep(1)
|
|
assert retries < 3, \
|
|
"Did not get a correct result for %s after 3 retries: %s" % (query, result)
|
|
|
|
def test_read_modes(self, vector, unique_database):
|
|
"""Other Kudu tests are run with a scan level of READ_AT_SNAPSHOT to have predicable
|
|
scan results. This test verifies that scans work as expected at the scan level of
|
|
READ_LATEST by retrying the scan if the results are incorrect."""
|
|
vector.set_exec_option('kudu_read_mode', 'READ_LATEST')
|
|
table_name = "%s.test_read_latest" % unique_database
|
|
self.execute_query_using_vector(
|
|
"""create table %s (a int primary key, b string)
|
|
partition by hash(a)
|
|
partitions 8 stored as kudu""" % table_name, vector)
|
|
self.execute_query_using_vector(
|
|
"insert into %s values (0, 'a'), (1, 'b'), (2, 'c')" % table_name, vector)
|
|
self._retry_query("select * from %s order by a" % table_name, vector,
|
|
['0\ta', '1\tb', '2\tc'])
|
|
self.execute_query_using_vector(
|
|
"""insert into %s select id, string_col from functional.alltypes
|
|
where id > 2 limit 100""" % table_name, vector)
|
|
self._retry_query("select count(*) from %s" % table_name, vector, ['103'])
|
|
|
|
def test_replica_selection(self, vector, unique_database):
|
|
"""This test verifies that scans work as expected with different replica selection.
|
|
"""
|
|
table_name = "%s.replica_selection" % unique_database
|
|
self.execute_query_using_vector(
|
|
"""create table %s (a int primary key, b string)
|
|
partition by hash(a)
|
|
partitions 8 stored as kudu""" % table_name, vector)
|
|
self.execute_query_using_vector(
|
|
"""insert into %s select id, string_col from functional.alltypes
|
|
limit 100""" % table_name, vector)
|
|
|
|
vector.set_exec_option('kudu_replica_selection', 'LEADER_ONLY')
|
|
result = self.execute_query_using_vector(
|
|
"select count(*) from %s" % table_name, vector)
|
|
assert result.data == ['100']
|
|
|
|
vector.set_exec_option('kudu_replica_selection', 'CLOSEST_REPLICA')
|
|
result = self.execute_query_using_vector(
|
|
"select count(*) from %s" % table_name, vector)
|
|
assert result.data == ['100']
|
|
|
|
|
|
class TestKuduPartitioning(KuduTestSuite):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
# Test both the interpreted and the codegen'd path.
|
|
super(TestKuduPartitioning, cls).add_test_dimensions()
|
|
|
|
def test_partitions_evenly_distributed(self, vector, kudu_client, unique_database):
|
|
"""Sanity check for KuduPartitionExpr. We insert numbers into a table and check that
|
|
inserted elements are distributed evenly among the partitions. The assumption here is
|
|
that the source distribution is more or less uniform and that hashing retains this
|
|
property. This protects against some but not all errors. The number of partitions
|
|
should be the same as the number of impalads."""
|
|
|
|
table_name = "partitioning"
|
|
table_full_name = unique_database + ".partitioning"
|
|
self.execute_query("""CREATE TABLE %s (a INT PRIMARY KEY)
|
|
PARTITION BY HASH(a) PARTITIONS 3 STORED AS KUDU""" % table_full_name,
|
|
vector.get_exec_option_dict())
|
|
assert kudu_client.table_exists(KuduTestSuite.to_kudu_table_name(
|
|
unique_database, table_name))
|
|
|
|
query = "INSERT INTO %s SELECT id FROM functional.alltypes" % table_full_name
|
|
result = self.execute_query(query, vector.get_exec_option_dict())
|
|
|
|
profile = result.runtime_profile
|
|
|
|
numbers = TestKuduPartitioning.extract_kudu_rows_from_profile(profile)
|
|
TestKuduPartitioning.assert_rows_evenly_distributed(numbers)
|
|
|
|
@staticmethod
|
|
def assert_rows_evenly_distributed(rows):
|
|
TOLERANCE_RATIO = 0.1
|
|
avg = rows[0] # The first result is from the averaged summary.
|
|
values = rows[1:]
|
|
|
|
for value in values:
|
|
abs_diff = abs(avg - value)
|
|
ratio = float(abs_diff) / avg
|
|
assert ratio < TOLERANCE_RATIO
|
|
|
|
@staticmethod
|
|
def extract_kudu_rows_from_profile(profile):
|
|
# First we look for a header that contains "KuduTableSink", then under that we find
|
|
# the number of rows.
|
|
res = []
|
|
kudu_table_sink = "KuduTableSink"
|
|
total_num_rows_re = re.compile(r"TotalNumRows:.*\(([0-9]+)\)")
|
|
|
|
within_kudu_table_sink_section = False
|
|
for line in profile.splitlines():
|
|
if within_kudu_table_sink_section:
|
|
match = total_num_rows_re.search(line)
|
|
if match:
|
|
res.append(int(match.group(1)))
|
|
within_kudu_table_sink_section = False
|
|
else:
|
|
if kudu_table_sink in line:
|
|
within_kudu_table_sink_section = True
|
|
return res
|
|
|
|
|
|
class TestCreateExternalTable(KuduTestSuite):
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestCreateExternalTable, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_external_timestamp_default_value(self, vector, kudu_client, unique_database):
|
|
"""Checks that a Kudu table created outside Impala with a default value on a
|
|
UNIXTIME_MICROS column can be loaded by Impala, and validates the DESCRIBE
|
|
output is correct."""
|
|
schema_builder = SchemaBuilder()
|
|
column_spec = schema_builder.add_column("id", INT64)
|
|
column_spec.nullable(False)
|
|
column_spec = schema_builder.add_column("ts", UNIXTIME_MICROS)
|
|
column_spec.default(datetime(2009, 1, 1, 0, 0, tzinfo=utc))
|
|
schema_builder.set_primary_keys(["id"])
|
|
schema = schema_builder.build()
|
|
tbl = 'tsdefault'
|
|
name = unique_database + "." + tbl
|
|
|
|
try:
|
|
kudu_client.create_table(
|
|
name, schema, partitioning=Partitioning().set_range_partition_columns(["id"]))
|
|
kudu_table = kudu_client.table(name)
|
|
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(unique_database, tbl, kudu_table.name), vector)
|
|
result = self.execute_query_using_vector("DESCRIBE %s" % name, vector)
|
|
table_desc = [[col.strip() if col else col for col in row.split('\t')]
|
|
for row in result.data]
|
|
# Pytest shows truncated output on failure, so print the details just in case.
|
|
LOG.info(table_desc)
|
|
assert ["ts", "timestamp", "", "false", "", "true", "1230768000000000",
|
|
"AUTO_ENCODING", "DEFAULT_COMPRESSION", "0"] in table_desc
|
|
finally:
|
|
if kudu_client.table_exists(name):
|
|
kudu_client.delete_table(name)
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_implicit_table_props(self, vector, kudu_client, unique_database):
|
|
"""Check that table properties added internally during table creation are as
|
|
expected.
|
|
"""
|
|
with self.temp_kudu_table(
|
|
kudu_client, [STRING, INT8, BOOL], num_key_cols=2) as kudu_table:
|
|
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
|
|
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(unique_database, impala_table_name, kudu_table.name),
|
|
vector)
|
|
result = self.execute_query_using_vector(
|
|
"DESCRIBE FORMATTED %s.%s" % (unique_database, impala_table_name), vector)
|
|
table_desc = [[col.strip() if col else col for col in row.split('\t')]
|
|
for row in result.data]
|
|
LOG.info(table_desc)
|
|
# Pytest shows truncated output on failure, so print the details just in case.
|
|
assert ["", "EXTERNAL", "TRUE"] in table_desc
|
|
assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc
|
|
assert ["", "kudu.table_name", kudu_table.name] in table_desc
|
|
assert ["", "storage_handler", "org.apache.hadoop.hive.kudu.KuduStorageHandler"] \
|
|
in table_desc
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_col_types(self, vector, kudu_client, unique_database):
|
|
"""Check that a table can be created using all available column types."""
|
|
# TODO: Add DECIMAL when the Kudu python client supports decimal
|
|
kudu_types = [STRING, BOOL, DOUBLE, FLOAT, INT16, INT32, INT64, INT8, BINARY,
|
|
UNIXTIME_MICROS, DATE]
|
|
with self.temp_kudu_table(kudu_client, kudu_types) as kudu_table:
|
|
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
|
|
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(unique_database, impala_table_name, kudu_table.name),
|
|
vector)
|
|
result = self.execute_query_using_vector(
|
|
"DESCRIBE %s.%s" % (unique_database, impala_table_name), vector)
|
|
kudu_schema = kudu_table.schema
|
|
for i, row in enumerate(result.data):
|
|
cols = row.split('\t')
|
|
kudu_col = kudu_schema[i]
|
|
assert cols[0] == kudu_col.name
|
|
assert cols[1].upper() == \
|
|
self.kudu_col_type_to_impala_col_type(kudu_col.type.type)
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_drop_external_table(self, vector, kudu_client, unique_database):
|
|
"""Check that dropping an external table only affects the catalog and does not delete
|
|
the table in Kudu.
|
|
"""
|
|
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
|
|
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
|
|
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(unique_database, impala_table_name, kudu_table.name),
|
|
vector)
|
|
result = self.execute_query_using_vector(
|
|
"SELECT COUNT(*) FROM %s.%s" % (unique_database, impala_table_name), vector)
|
|
assert result.data == ['0']
|
|
result = self.execute_query_using_vector(
|
|
"DROP TABLE %s.%s" % (unique_database, impala_table_name), vector)
|
|
assert result.success
|
|
try:
|
|
self.execute_query_using_vector(
|
|
"SELECT COUNT(*) FROM %s.%s" % (unique_database, impala_table_name), vector)
|
|
assert False
|
|
except Exception as e:
|
|
assert "Could not resolve table reference" in str(e)
|
|
assert kudu_client.table_exists(kudu_table.name)
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_explicit_name(self, vector, kudu_client, unique_database):
|
|
"""Check that a Kudu table can be specified using a table property."""
|
|
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
|
|
table_name = self.random_table_name()
|
|
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(unique_database, table_name, kudu_table.name),
|
|
vector)
|
|
result = self.execute_query_using_vector(
|
|
"SELECT * FROM %s.%s" % (unique_database, table_name), vector)
|
|
assert len(result.data) == 0
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_explicit_name_preference(self, vector, kudu_client, unique_database):
|
|
"""Check that the table name from a table property is used when a table of the
|
|
implied name also exists.
|
|
"""
|
|
with self.temp_kudu_table(kudu_client, [INT64]) as preferred_kudu_table:
|
|
with self.temp_kudu_table(kudu_client, [INT8]) as other_kudu_table:
|
|
impala_table_name = self.get_kudu_table_base_name(other_kudu_table.name)
|
|
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(
|
|
unique_database, impala_table_name, preferred_kudu_table.name),
|
|
vector)
|
|
result = self.execute_query_using_vector(
|
|
"DESCRIBE %s.%s" % (unique_database, impala_table_name), vector)
|
|
assert result.data[0].split('\t') == \
|
|
["a", "bigint", "", "true", "true", "false", "", "AUTO_ENCODING",
|
|
"DEFAULT_COMPRESSION", "0"]
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_explicit_name_doesnt_exist(self, vector, unique_database):
|
|
kudu_table_name = self.random_table_name()
|
|
try:
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(
|
|
unique_database, self.random_table_name(), kudu_table_name),
|
|
vector)
|
|
assert False
|
|
except Exception as e:
|
|
assert "Table does not exist in Kudu: '%s'" % kudu_table_name in str(e)
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_explicit_name_doesnt_exist_but_implicit_does(self, vector, kudu_client,
|
|
unique_database):
|
|
"""Check that when an explicit table name is given but that table doesn't exist,
|
|
there is no fall-through to an existing implicit table.
|
|
"""
|
|
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
|
|
table_name = self.random_table_name()
|
|
try:
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(
|
|
unique_database, self.get_kudu_table_base_name(kudu_table.name), table_name),
|
|
vector)
|
|
assert False
|
|
except Exception as e:
|
|
assert "Table does not exist in Kudu: '%s'" % table_name in str(e)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_table_without_partitioning(self, vector, kudu_client, unique_database):
|
|
"""Test a Kudu table created without partitioning (i.e. equivalent to a single
|
|
unbounded partition). It is not possible to create such a table in Impala, but
|
|
it can be created directly in Kudu and then loaded as an external table.
|
|
Regression test for IMPALA-5154."""
|
|
vector.set_exec_option('kudu_read_mode', 'READ_AT_SNAPSHOT')
|
|
schema_builder = SchemaBuilder()
|
|
column_spec = schema_builder.add_column("id", INT64)
|
|
column_spec.nullable(False)
|
|
schema_builder.set_primary_keys(["id"])
|
|
schema = schema_builder.build()
|
|
partitioning = Partitioning().set_range_partition_columns([])
|
|
tbl = 'one_big_unbounded_partition'
|
|
name = "%s.%s" % (unique_database, tbl)
|
|
|
|
try:
|
|
kudu_client.create_table(name, schema, partitioning=partitioning)
|
|
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(unique_database, tbl, name), vector)
|
|
self.execute_query_using_vector(
|
|
"INSERT INTO %s VALUES (1), (2), (3)" % name, vector)
|
|
result = self.execute_query_using_vector(
|
|
"SELECT COUNT(*) FROM %s" % name, vector)
|
|
assert result.data == ['3']
|
|
try:
|
|
self.execute_query_using_vector(
|
|
"SHOW RANGE PARTITIONS %s" % name, vector)
|
|
assert False
|
|
except Exception as e:
|
|
assert "AnalysisException: SHOW RANGE PARTITIONS requested but table does "\
|
|
"not have range partitions" in str(e)
|
|
finally:
|
|
if kudu_client.table_exists(name):
|
|
kudu_client.delete_table(name)
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_column_name_case(self, vector, kudu_client, unique_database):
|
|
"""IMPALA-5286: Tests that an external Kudu table that was created with a column name
|
|
containing upper case letters is handled correctly."""
|
|
vector.set_exec_option('kudu_read_mode', 'READ_AT_SNAPSHOT')
|
|
tbl = 'kudu_external_test'
|
|
table_name = '%s.%s' % (unique_database, tbl)
|
|
if kudu_client.table_exists(table_name):
|
|
kudu_client.delete_table(table_name)
|
|
|
|
schema_builder = SchemaBuilder()
|
|
key_col = 'Key'
|
|
schema_builder.add_column(key_col, INT64).nullable(False).primary_key()
|
|
schema = schema_builder.build()
|
|
partitioning = Partitioning().set_range_partition_columns([key_col])\
|
|
.add_range_partition([1], [10])
|
|
|
|
try:
|
|
kudu_client.create_table(table_name, schema, partitioning)
|
|
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(unique_database, tbl, table_name), vector)
|
|
|
|
# Perform a variety of operations on the table.
|
|
self.execute_query_using_vector(
|
|
"insert into %s (kEy) values (5), (1), (4)" % table_name, vector)
|
|
result = self.execute_query_using_vector(
|
|
"select keY from %s where KeY %% 2 = 0" % table_name, vector)
|
|
assert result.data == ['4']
|
|
result = self.execute_query_using_vector(
|
|
"select * from %s order by kEY" % table_name, vector)
|
|
assert result.data == ['1', '4', '5']
|
|
|
|
# Do a join with a runtime filter targeting the column.
|
|
result = self.execute_query_using_vector(
|
|
"select count(*) from %s a, %s b where a.key = b.key" % (table_name, table_name),
|
|
vector)
|
|
assert result.data == ['3']
|
|
|
|
self.execute_query_using_vector(
|
|
"alter table %s add range partition 11 < values < 20" % table_name, vector)
|
|
|
|
new_key = "KEY2"
|
|
self.execute_query_using_vector(
|
|
"alter table %s change KEy %s bigint" % (table_name, new_key),
|
|
vector)
|
|
val_col = "vaL"
|
|
self.execute_query_using_vector(
|
|
"alter table %s add columns (%s bigint)" % (table_name, val_col), vector)
|
|
|
|
result = self.execute_query_using_vector("describe %s" % table_name, vector)
|
|
# 'describe' should print the column name in lower case.
|
|
assert new_key.lower() in result.data[0]
|
|
assert val_col.lower() in result.data[1]
|
|
|
|
self.execute_query_using_vector(
|
|
"alter table %s drop column Val" % table_name, vector)
|
|
result = self.execute_query_using_vector("describe %s" % table_name, vector)
|
|
assert len(result.data) == 1
|
|
|
|
self.execute_query_using_vector(
|
|
"alter table %s drop range partition 11 < values < 20" % table_name, vector)
|
|
finally:
|
|
if kudu_client.table_exists(table_name):
|
|
kudu_client.delete_table(table_name)
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_conflicting_column_name(self, vector, kudu_client, unique_database):
|
|
"""IMPALA-5283: Tests that loading an external Kudu table that was created with column
|
|
names that differ only in case results in an error."""
|
|
tbl = 'kudu_external_test'
|
|
table_name = '%s.%s' % (unique_database, tbl)
|
|
if kudu_client.table_exists(table_name):
|
|
kudu_client.delete_table(table_name)
|
|
|
|
schema_builder = SchemaBuilder()
|
|
col0 = 'col'
|
|
schema_builder.add_column(col0, INT64).nullable(False).primary_key()
|
|
col1 = 'COL'
|
|
schema_builder.add_column(col1, INT64)
|
|
schema = schema_builder.build()
|
|
partitioning = Partitioning().set_range_partition_columns([col0])\
|
|
.add_range_partition([1], [10])
|
|
|
|
try:
|
|
kudu_client.create_table(table_name, schema, partitioning)
|
|
|
|
self.execute_query_using_vector(
|
|
create_external_kudu_query(unique_database, tbl, table_name), vector)
|
|
assert False, 'create table should have resulted in an exception'
|
|
except Exception as e:
|
|
assert 'Error loading Kudu table: Impala does not support column names that ' \
|
|
+ 'differ only in casing' in str(e)
|
|
finally:
|
|
if kudu_client.table_exists(table_name):
|
|
kudu_client.delete_table(table_name)
|
|
|
|
|
|
class TestShowCreateTable(KuduTestSuite):
|
|
column_properties = "ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION"
|
|
|
|
def assert_show_create_equals(self, db_name, create_sql, show_create_sql,
|
|
do_exact_match=False, extra_args=None):
|
|
"""Executes 'create_sql' to create a table, then runs "SHOW CREATE TABLE" and checks
|
|
that the output is the same as 'show_create_sql'. 'create_sql' and
|
|
'show_create_sql' can be templates that can be used with str.format(). format()
|
|
will be called with 'table', 'db', 'kudu_addr', and 'extra_args' entries as
|
|
keyword args. Also, compares HMS-3 specific output due to HMS translation.
|
|
If do_exact_match is True does not manipulate the output and compares exactly
|
|
with the show_create_sql parameter.
|
|
"""
|
|
format_args = {
|
|
"table": self.random_table_name(),
|
|
"db": db_name,
|
|
"kudu_addr": KUDU_MASTER_HOSTS}
|
|
if extra_args is not None:
|
|
format_args.update(extra_args)
|
|
self.execute_query(create_sql.format(**format_args))
|
|
result = self.execute_query("SHOW CREATE TABLE {db}.{table}".format(**format_args))
|
|
output = result.data[0]
|
|
if not do_exact_match and HIVE_MAJOR_VERSION > 2:
|
|
# in case of HMS-3 all Kudu tables are translated to external tables with some
|
|
# additional properties. This code below makes sure that we have the expected table
|
|
# properties and the table is external
|
|
# TODO we should move these tests to a query.test file so that we can have better
|
|
# way to compare the output against different hive versions
|
|
assert output.startswith("CREATE EXTERNAL TABLE")
|
|
assert "'external.table.purge'='TRUE', " in output
|
|
# We have made sure that the output starts with CREATE EXTERNAL TABLE, now we can
|
|
# change it to "CREATE TABLE" to make it easier to compare rest of the str
|
|
output = output.replace("CREATE EXTERNAL TABLE", "CREATE TABLE")
|
|
# We should also remove the additional tbl property external.table.purge so that we
|
|
# can compare the rest of output
|
|
output = output.replace("'external.table.purge'='TRUE', ", "")
|
|
assert output == \
|
|
textwrap.dedent(show_create_sql.format(**format_args)).strip()
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_primary_key_and_distribution(self, unique_database):
|
|
# TODO: Add case with BLOCK_SIZE
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE TABLE {db}.{table} (c INT PRIMARY KEY)
|
|
PARTITION BY HASH (c) PARTITIONS 3 STORED AS KUDU""",
|
|
"""
|
|
CREATE TABLE {db}.{table} (
|
|
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
PRIMARY KEY (c)
|
|
)
|
|
PARTITION BY HASH (c) PARTITIONS 3
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL'='TRUE', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""")
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE TABLE {db}.{table} (c INT PRIMARY KEY, d STRING NULL)
|
|
PARTITION BY HASH (c) PARTITIONS 3, RANGE (c)
|
|
(PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2,
|
|
PARTITION 2 < VALUES) STORED AS KUDU""",
|
|
"""
|
|
CREATE TABLE {db}.{table} (
|
|
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
d STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
PRIMARY KEY (c)
|
|
)
|
|
PARTITION BY HASH (c) PARTITIONS 3, RANGE (c) (...)
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL'='TRUE', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""")
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE TABLE {db}.{table} (c INT ENCODING PLAIN_ENCODING, PRIMARY KEY (c))
|
|
PARTITION BY HASH (c) PARTITIONS 3 STORED AS KUDU""",
|
|
"""
|
|
CREATE TABLE {db}.{table} (
|
|
c INT NOT NULL ENCODING PLAIN_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
PRIMARY KEY (c)
|
|
)
|
|
PARTITION BY HASH (c) PARTITIONS 3
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL'='TRUE', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""")
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE TABLE {db}.{table} (c INT COMPRESSION LZ4, d STRING, PRIMARY KEY(c, d))
|
|
PARTITION BY HASH (c) PARTITIONS 3, HASH (d) PARTITIONS 3,
|
|
RANGE (c, d) (PARTITION VALUE = (1, 'aaa'), PARTITION VALUE = (2, 'bbb'))
|
|
STORED AS KUDU""",
|
|
"""
|
|
CREATE TABLE {db}.{table} (
|
|
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION LZ4,
|
|
d STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
PRIMARY KEY (c, d)
|
|
)
|
|
PARTITION BY HASH (c) PARTITIONS 3, HASH (d) PARTITIONS 3, RANGE (c, d) (...)
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL'='TRUE', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""")
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE TABLE {db}.{table} (c INT, d STRING, e INT NULL DEFAULT 10, """
|
|
"""PRIMARY KEY(c, d))
|
|
PARTITION BY RANGE (c) (PARTITION VALUES <= 1, PARTITION 1 < VALUES <= 2,
|
|
PARTITION 2 < VALUES <= 3, PARTITION 3 < VALUES) STORED AS KUDU""",
|
|
"""
|
|
CREATE TABLE {db}.{table} (
|
|
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
d STRING NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
e INT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION DEFAULT 10,
|
|
PRIMARY KEY (c, d)
|
|
)
|
|
PARTITION BY RANGE (c) (...)
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL'='TRUE', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""")
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE TABLE {db}.{table} (c INT PRIMARY KEY) STORED AS KUDU""",
|
|
"""
|
|
CREATE TABLE {db}.{table} (
|
|
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
PRIMARY KEY (c)
|
|
)
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL'='TRUE', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""")
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE TABLE {db}.{table} (c INT COMMENT 'Ab 1@' PRIMARY KEY) STORED AS KUDU""",
|
|
"""
|
|
CREATE TABLE {db}.{table} (
|
|
c INT NOT NULL {p} COMMENT 'Ab 1@',
|
|
PRIMARY KEY (c)
|
|
)
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL'='TRUE', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""",
|
|
extra_args={'p': self.column_properties})
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_timestamp_default_value(self, unique_database):
|
|
create_sql_fmt = (
|
|
"""
|
|
CREATE TABLE {db}.{table} (c INT, d TIMESTAMP,
|
|
e TIMESTAMP NULL DEFAULT CAST('{ts_create}' AS TIMESTAMP),
|
|
PRIMARY KEY(c, d))
|
|
PARTITION BY HASH(c) PARTITIONS 3
|
|
STORED AS KUDU""")
|
|
# Long lines are unfortunate, but extra newlines will break the test.
|
|
show_create_sql_fmt = (
|
|
"""
|
|
CREATE TABLE {db}.{table} (
|
|
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
d TIMESTAMP NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
e TIMESTAMP NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION """
|
|
"""DEFAULT unix_micros_to_utc_timestamp({ts_show}),
|
|
PRIMARY KEY (c, d)
|
|
)
|
|
PARTITION BY HASH (c) PARTITIONS 3
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL'='TRUE', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""")
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
create_sql_fmt, show_create_sql_fmt,
|
|
extra_args={
|
|
"ts_create": "2009-01-01 00:00:00.000001000",
|
|
"ts_show": "1230768000000001"})
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
create_sql_fmt, show_create_sql_fmt,
|
|
extra_args={
|
|
"ts_create": "2009-01-01 00:00:00.000001001",
|
|
"ts_show": "1230768000000001"})
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
create_sql_fmt, show_create_sql_fmt,
|
|
extra_args={
|
|
"ts_create": "2009-01-01 00:00:00.000000999",
|
|
"ts_show": "1230768000000001"})
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_external_kudu_table_name_with_show_create(
|
|
self, kudu_client, unique_database):
|
|
"""Check that the generated kudu.table_name tblproperty is present with
|
|
show create table with external Kudu tables.
|
|
"""
|
|
schema_builder = SchemaBuilder()
|
|
column_spec = schema_builder.add_column("id", INT64)
|
|
column_spec.nullable(False)
|
|
schema_builder.set_primary_keys(["id"])
|
|
partitioning = Partitioning().set_range_partition_columns(["id"])
|
|
schema = schema_builder.build()
|
|
|
|
kudu_table_name = self.random_table_name()
|
|
try:
|
|
kudu_client.create_table(kudu_table_name, schema, partitioning)
|
|
kudu_table = kudu_client.table(kudu_table_name)
|
|
|
|
table_name_prop = "'kudu.table_name'='%s'" % kudu_table.name
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE EXTERNAL TABLE {db}.{table} STORED AS KUDU
|
|
TBLPROPERTIES({props})""",
|
|
"""
|
|
CREATE EXTERNAL TABLE {db}.{table}
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}', {kudu_table})""",
|
|
do_exact_match=True,
|
|
extra_args={
|
|
"props": table_name_prop,
|
|
"kudu_table": table_name_prop})
|
|
finally:
|
|
if kudu_client.table_exists(kudu_table_name):
|
|
kudu_client.delete_table(kudu_table_name)
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_managed_kudu_table_name_with_show_create(self, unique_database):
|
|
"""Check that the generated kudu.table_name tblproperty is not present with
|
|
show create table with managed Kudu tables.
|
|
"""
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE TABLE {db}.{table} (c INT PRIMARY KEY)
|
|
PARTITION BY HASH (c) PARTITIONS 3
|
|
STORED AS KUDU""",
|
|
"""
|
|
CREATE TABLE {db}.{table} (
|
|
c INT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
PRIMARY KEY (c)
|
|
)
|
|
PARTITION BY HASH (c) PARTITIONS 3
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('TRANSLATED_TO_EXTERNAL'='TRUE', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""")
|
|
|
|
def test_synchronized_kudu_table_with_show_create(self, unique_database):
|
|
# in this case we do exact match with the provided input since this is specifically
|
|
# creating a synchronized table
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE EXTERNAL TABLE {db}.{table} (
|
|
id BIGINT,
|
|
name STRING,
|
|
PRIMARY KEY(id))
|
|
PARTITION BY HASH PARTITIONS 16
|
|
STORED AS KUDU
|
|
TBLPROPERTIES('external.table.purge'='true')""",
|
|
"""
|
|
CREATE EXTERNAL TABLE {db}.{table} (
|
|
id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
name STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
PRIMARY KEY (id)
|
|
)
|
|
PARTITION BY HASH (id) PARTITIONS 16
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('external.table.purge'='true', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""",
|
|
do_exact_match=True)
|
|
|
|
self.assert_show_create_equals(
|
|
unique_database,
|
|
"""
|
|
CREATE EXTERNAL TABLE {db}.{table} (
|
|
id BIGINT PRIMARY KEY,
|
|
name STRING)
|
|
PARTITION BY HASH(id) PARTITIONS 16
|
|
STORED AS KUDU
|
|
TBLPROPERTIES('external.table.purge'='true')""",
|
|
"""
|
|
CREATE EXTERNAL TABLE {db}.{table} (
|
|
id BIGINT NOT NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
name STRING NULL ENCODING AUTO_ENCODING COMPRESSION DEFAULT_COMPRESSION,
|
|
PRIMARY KEY (id)
|
|
)
|
|
PARTITION BY HASH (id) PARTITIONS 16
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('external.table.purge'='true', """
|
|
"""'kudu.master_addresses'='{kudu_addr}')""",
|
|
do_exact_match=True)
|
|
|
|
|
|
class TestDropDb(KuduTestSuite):
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_drop_non_empty_db(self, kudu_client, unique_database):
|
|
"""Check that an attempt to drop a database will fail if Kudu tables are present
|
|
and that the tables remain.
|
|
"""
|
|
with self.temp_kudu_table(
|
|
kudu_client, [INT32], db_name=unique_database) as kudu_table:
|
|
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
|
|
self.execute_query(
|
|
create_external_kudu_query(unique_database, impala_table_name, kudu_table.name))
|
|
self.execute_query("USE DEFAULT")
|
|
try:
|
|
self.execute_query("DROP DATABASE %s" % unique_database)
|
|
assert False
|
|
except Exception as e:
|
|
assert "One or more tables exist" in str(e)
|
|
result = self.execute_query("SELECT COUNT(*) FROM %s.%s" % (
|
|
unique_database, impala_table_name))
|
|
assert result.data == ['0']
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_drop_db_cascade(self, kudu_client, unique_name):
|
|
"""Check that an attempt to drop a database will succeed even if Kudu tables are
|
|
present and that the managed tables are removed.
|
|
"""
|
|
self.execute_query("CREATE DATABASE " + unique_name)
|
|
with self.temp_kudu_table(kudu_client, [INT32], db_name=unique_name) as kudu_table:
|
|
# Create an external Kudu table
|
|
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
|
|
self.execute_query(
|
|
create_external_kudu_query(unique_name, impala_table_name, kudu_table.name))
|
|
|
|
# Create a managed Kudu table
|
|
managed_table_name = self.random_table_name()
|
|
self.execute_query("""
|
|
CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a) PARTITIONS 3
|
|
STORED AS KUDU""" % (unique_name, managed_table_name))
|
|
kudu_table_name = "impala::" + unique_name + "." + managed_table_name
|
|
assert kudu_client.table_exists(kudu_table_name)
|
|
|
|
# Create a table in HDFS
|
|
hdfs_table_name = self.random_table_name()
|
|
self.execute_query("""CREATE TABLE %s.%s (a INT) PARTITIONED BY (x INT)""" % (
|
|
unique_name, hdfs_table_name))
|
|
|
|
self.execute_query("USE DEFAULT")
|
|
self.execute_query("DROP DATABASE %s CASCADE" % unique_name)
|
|
result = self.execute_query("SHOW DATABASES")
|
|
assert unique_name not in result.data
|
|
assert kudu_client.table_exists(kudu_table.name)
|
|
assert not kudu_client.table_exists(managed_table_name)
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_soft_drop_db_cascade(self, kudu_client, unique_name):
|
|
"""Check that an attempt to drop a database will succeed but the managed Kudu tables
|
|
are not removed immediately if 'kudu_table_reserve_seconds' is greater than 0.
|
|
These Kudu tables are in 'soft_deleted' state and can be recalled during the
|
|
reservation period.
|
|
"""
|
|
options = dict()
|
|
self.execute_query("CREATE DATABASE " + unique_name, options)
|
|
table_name_pattern = "managed_kudu_table_"
|
|
for i in range(10):
|
|
managed_table_name = table_name_pattern + str(i)
|
|
kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_name, managed_table_name)
|
|
if kudu_client.table_exists(kudu_tbl_name):
|
|
kudu_client.delete_table(kudu_tbl_name)
|
|
self.execute_query("""
|
|
CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a) PARTITIONS 3
|
|
STORED AS KUDU""" % (unique_name, managed_table_name), options)
|
|
assert kudu_client.table_exists(kudu_tbl_name)
|
|
|
|
options['kudu_table_reserve_seconds'] = 300
|
|
self.execute_query("USE DEFAULT", options)
|
|
self.execute_query("DROP DATABASE %s CASCADE" % unique_name, options)
|
|
result = self.execute_query("SHOW DATABASES", options)
|
|
assert unique_name not in result.data
|
|
|
|
for i in range(10):
|
|
kudu_tbl_name = \
|
|
KuduTestSuite.to_kudu_table_name(unique_name, table_name_pattern + str(i))
|
|
assert kudu_client.table_exists(kudu_tbl_name)
|
|
assert kudu_tbl_name not in kudu_client.list_tables()
|
|
assert kudu_tbl_name in kudu_client.list_soft_deleted_tables()
|
|
table = kudu_client.table(kudu_tbl_name)
|
|
kudu_client.recall_table(table.id)
|
|
assert kudu_tbl_name in kudu_client.list_tables()
|
|
assert kudu_tbl_name not in kudu_client.list_soft_deleted_tables()
|
|
|
|
|
|
class TestImpalaKuduIntegration(KuduTestSuite):
|
|
|
|
@classmethod
|
|
def default_test_protocol(cls):
|
|
# Some tests here inspect result through ImpylaHS2ResultSet.tuples().
|
|
# Make sure to test only with 'hs2' protocol and client.
|
|
return HS2
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_replace_kudu_table(self, kudu_client, unique_database):
|
|
"""Check that an external Kudu table is accessible if the underlying Kudu table is
|
|
modified using the Kudu client.
|
|
"""
|
|
# Create an external Kudu table
|
|
col_names = ['a']
|
|
with self.temp_kudu_table(kudu_client, [INT32], col_names=col_names,
|
|
db_name=unique_database) as kudu_table:
|
|
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
|
|
self.execute_query(
|
|
create_external_kudu_query(unique_database, impala_table_name, kudu_table.name))
|
|
result = self.execute_query("DESCRIBE %s.%s" % (
|
|
unique_database, impala_table_name))
|
|
assert result.tuples() == \
|
|
[("a", "int", "", "true", "true", "false", "", "AUTO_ENCODING",
|
|
"DEFAULT_COMPRESSION", "0")]
|
|
|
|
# Drop the underlying Kudu table and replace it with another Kudu table that has
|
|
# the same name but different schema
|
|
kudu_client.delete_table(kudu_table.name)
|
|
assert not kudu_client.table_exists(kudu_table.name)
|
|
new_col_names = ['b', 'c']
|
|
name_parts = kudu_table.name.split(".")
|
|
assert len(name_parts) == 2
|
|
with self.temp_kudu_table(
|
|
kudu_client, [STRING, STRING], col_names=new_col_names, db_name=name_parts[0],
|
|
name=name_parts[1]) as new_kudu_table:
|
|
assert kudu_client.table_exists(new_kudu_table.name)
|
|
# Refresh the external table and verify that the new schema is loaded from
|
|
# Kudu.
|
|
self.execute_query("REFRESH %s.%s" % (
|
|
unique_database, impala_table_name))
|
|
result = self.execute_query("DESCRIBE %s.%s" % (
|
|
unique_database, impala_table_name))
|
|
assert result.tuples() == \
|
|
[("b", "string", "", "true", "true", "false", "", "AUTO_ENCODING",
|
|
"DEFAULT_COMPRESSION", "0"),
|
|
("c", "string", "", "false", "", "true", "", "AUTO_ENCODING",
|
|
"DEFAULT_COMPRESSION", "0")]
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_delete_external_kudu_table(self, kudu_client, unique_database):
|
|
"""Check that Impala can recover from the case where the underlying Kudu table of
|
|
an external table is dropped using the Kudu client.
|
|
"""
|
|
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
|
|
# Create an external Kudu table
|
|
impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
|
|
self.execute_query(
|
|
create_external_kudu_query(unique_database, impala_table_name, kudu_table.name))
|
|
result = self.execute_query("DESCRIBE %s.%s" % (
|
|
unique_database, impala_table_name))
|
|
assert result.tuples() == \
|
|
[("a", "int", "", "true", "true", "false", "", "AUTO_ENCODING",
|
|
"DEFAULT_COMPRESSION", "0")]
|
|
# Drop the underlying Kudu table
|
|
kudu_client.delete_table(kudu_table.name)
|
|
assert not kudu_client.table_exists(kudu_table.name)
|
|
err_msg = 'the table does not exist: table_name: "%s"' % (kudu_table.name)
|
|
try:
|
|
self.execute_query("REFRESH %s.%s" % (unique_database, impala_table_name))
|
|
except Exception as e:
|
|
assert err_msg in str(e)
|
|
self.execute_query("DROP TABLE %s.%s" % (unique_database, impala_table_name))
|
|
result = self.execute_query("SHOW TABLES IN " + unique_database)
|
|
assert (impala_table_name,) not in result.tuples()
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_delete_managed_kudu_table(self, kudu_client, unique_database):
|
|
"""Check that dropping a managed Kudu table works even if the underlying Kudu table
|
|
has been dropped externally."""
|
|
impala_tbl_name = "foo"
|
|
self.execute_query("""CREATE TABLE %s.%s (a INT PRIMARY KEY) PARTITION BY HASH (a)
|
|
PARTITIONS 3 STORED AS KUDU""" % (unique_database, impala_tbl_name))
|
|
kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, impala_tbl_name)
|
|
assert kudu_client.table_exists(kudu_tbl_name)
|
|
kudu_client.delete_table(kudu_tbl_name)
|
|
assert not kudu_client.table_exists(kudu_tbl_name)
|
|
self.execute_query("DROP TABLE %s.%s" % (unique_database, impala_tbl_name))
|
|
result = self.execute_query("SHOW TABLES IN %s" % unique_database)
|
|
assert (impala_tbl_name,) not in result.tuples()
|
|
|
|
@SkipIfKudu.hms_integration_enabled()
|
|
def test_soft_delete_kudu_table(self, kudu_client, unique_database):
|
|
"""Check that the query option 'kudu_table_reserve_seconds' works for managed Kudu
|
|
table. If it is greater than 0, the underlying Kudu will not be deleted immediately.
|
|
During the reservation period, the Kudu table can be recalled."""
|
|
options = dict()
|
|
tbl = "foo"
|
|
db_tbl = unique_database + "." + tbl
|
|
kudu_tbl_name = KuduTestSuite.to_kudu_table_name(unique_database, tbl)
|
|
if kudu_client.table_exists(kudu_tbl_name):
|
|
kudu_client.delete_table(kudu_tbl_name)
|
|
self.execute_query("""CREATE TABLE %s (a INT PRIMARY KEY) PARTITION BY HASH (a)
|
|
PARTITIONS 3 STORED AS KUDU""" % db_tbl, options)
|
|
assert kudu_client.table_exists(kudu_tbl_name)
|
|
|
|
options["kudu_table_reserve_seconds"] = 300
|
|
self.execute_query("DROP TABLE %s" % db_tbl, options)
|
|
result = self.execute_query("SHOW TABLES IN %s" % unique_database, options)
|
|
assert (db_tbl,) not in result.tuples()
|
|
|
|
assert kudu_client.table_exists(kudu_tbl_name)
|
|
assert kudu_tbl_name not in kudu_client.list_tables()
|
|
assert kudu_tbl_name in kudu_client.list_soft_deleted_tables()
|
|
|
|
table = kudu_client.table(kudu_tbl_name)
|
|
kudu_client.recall_table(table.id)
|
|
assert kudu_tbl_name in kudu_client.list_tables()
|
|
assert kudu_tbl_name not in kudu_client.list_soft_deleted_tables()
|
|
|
|
|
|
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
|
class TestKuduMemLimits(KuduTestSuite):
|
|
|
|
QUERIES = ["select * from tpch_kudu.lineitem where l_orderkey = -1",
|
|
"select * from tpch_kudu.lineitem where l_commitdate like '%cheese'",
|
|
"select * from tpch_kudu.lineitem limit 90"]
|
|
|
|
# The value indicates the minimum memory requirements for the queries above, the first
|
|
# memory limit corresponds to the first query
|
|
MB = 1024 * 1024
|
|
QUERY_MEM_LIMITS = [MB, MB, 10 * MB]
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestKuduMemLimits, cls).add_test_dimensions()
|
|
# The default read mode of READ_LATEST does not provide high enough consistency for
|
|
# these tests.
|
|
add_exec_option_dimension(cls, "mem_limit", [cls.MB, 10 * cls.MB, 0])
|
|
# IMPALA-9856: We disable query result spooling so that this test can run queries
|
|
# with low mem_limit.
|
|
add_mandatory_exec_option(cls, 'spool_query_results', False)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_low_mem_limit_low_selectivity_scan(self, vector):
|
|
"""Tests that the queries specified in this test suite run under the given
|
|
memory limits."""
|
|
for i, q in enumerate(self.QUERIES):
|
|
try:
|
|
self.execute_query(q, vector.get_exec_option_dict())
|
|
except Exception as e:
|
|
if (vector.get_exec_option('mem_limit') > self.QUERY_MEM_LIMITS[i]):
|
|
raise
|
|
assert "Memory limit exceeded" in str(e)
|
|
|
|
# IMPALA-4654: Validate the fix for a bug where LimitReached() wasn't respected in
|
|
# the KuduScanner and the limit query above would result in a fragment running an
|
|
# additional minute. This ensures that the num fragments 'in flight' reaches 0 in
|
|
# less time than IMPALA-4654 was reproducing (~60sec) but yet still enough time that
|
|
# this test won't be flaky.
|
|
verifiers = [MetricVerifier(i.service)
|
|
for i in ImpalaCluster.get_e2e_test_cluster().impalads]
|
|
for v in verifiers:
|
|
v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=30)
|
|
|
|
|
|
@SkipIfHive2.create_external_kudu_table
|
|
class TestCreateSynchronizedTable(KuduTestSuite):
|
|
|
|
def test_create_synchronized_table(self, kudu_client, unique_database):
|
|
"""
|
|
Creates a synchronized Kudu table and makes sure that the statement does not fail.
|
|
"""
|
|
table_name = self.random_table_name()
|
|
# create a external kudu table with external.table.purge=true
|
|
self.execute_query("""
|
|
CREATE EXTERNAL TABLE {}.{} (
|
|
id int PRIMARY KEY,
|
|
name string)
|
|
PARTITION BY HASH PARTITIONS 8
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('external.table.purge'='true')
|
|
""".format(unique_database, table_name))
|
|
# make sure that the table was created
|
|
result = self.execute_query("SHOW TABLES IN {}".format(unique_database))
|
|
assert table_name in result.data
|
|
# make sure that the kudu table was created with default name
|
|
assert kudu_client.table_exists(self.to_kudu_table_name(unique_database, table_name))
|
|
# make sure that the external.table.purge property can be changed
|
|
self.execute_query(
|
|
"ALTER TABLE {}.{} set TBLPROPERTIES ('external.table.purge'='FALSE')".format(
|
|
unique_database, table_name))
|
|
result = self.execute_query("SHOW TABLES IN {}".format(unique_database))
|
|
assert table_name in result.data
|
|
self.execute_query(
|
|
"ALTER TABLE {}.{} set TBLPROPERTIES ('external.table.purge'='TRUE')".format(
|
|
unique_database, table_name))
|
|
result = self.execute_query("SHOW TABLES IN {}".format(unique_database))
|
|
assert table_name in result.data
|
|
# make sure that table can be renamed
|
|
new_table_name = self.random_table_name()
|
|
self.execute_query(
|
|
"ALTER TABLE {}.{} rename to {}.{}".format(
|
|
unique_database, table_name, unique_database, new_table_name))
|
|
result = self.execute_query("SHOW TABLES IN {}".format(unique_database))
|
|
assert new_table_name in result.data
|
|
# make sure that the kudu table was created with default name
|
|
assert kudu_client.table_exists(
|
|
self.to_kudu_table_name(unique_database, new_table_name))
|
|
# now make sure that table disappears after we remove it
|
|
self.execute_query("DROP TABLE {}.{}".format(unique_database, new_table_name))
|
|
result = self.execute_query("SHOW TABLES IN {}".format(unique_database))
|
|
assert new_table_name not in result.data
|
|
assert not kudu_client.table_exists(
|
|
self.to_kudu_table_name(unique_database, new_table_name))
|
|
|
|
def test_invalid_sync_table_stmts(self, unique_database):
|
|
"""
|
|
Test makes sure that a invalid way to create a synchronized table is erroring out
|
|
"""
|
|
table_name = self.random_table_name()
|
|
try:
|
|
self.execute_query("""
|
|
CREATE EXTERNAL TABLE %s.%s (
|
|
a int PRIMARY KEY)
|
|
PARTITION BY HASH PARTITIONS 8
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('external.table.purge'='false')
|
|
""" % (unique_database, table_name))
|
|
assert False, \
|
|
"Create table statement with external.table.purge=False should error out"
|
|
except Exception as e:
|
|
# We throw this exception since the analyzer checks for properties one by one.
|
|
# This is the first property that it checks for an external table
|
|
assert "Table property kudu.table_name must be specified when " \
|
|
"creating an external Kudu table" in str(e)
|
|
|
|
try:
|
|
# missing external.table.purge in TBLPROPERTIES
|
|
self.execute_query("""
|
|
CREATE EXTERNAL TABLE %s.%s (
|
|
a int PRIMARY KEY)
|
|
PARTITION BY HASH PARTITIONS 8
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('FOO'='BAR')
|
|
""" % (unique_database, table_name))
|
|
assert False, \
|
|
"Create external table statement must include external.table.purge property"
|
|
except Exception as e:
|
|
# We throw this exception since the analyzer checks for properties one by one.
|
|
# This is the first property that it checks for an external table
|
|
assert "Table property kudu.table_name must be specified when " \
|
|
"creating an external Kudu table" in str(e)
|
|
|
|
try:
|
|
# Trying to create a managed table with external.purge.table property in it
|
|
self.execute_query("""
|
|
CREATE TABLE %s.%s (
|
|
a int PRIMARY KEY)
|
|
PARTITION BY HASH PARTITIONS 8
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('external.table.purge'='true')
|
|
""" % (unique_database, table_name))
|
|
assert False, \
|
|
"Managed table creation with external.table.purge property must be disallowed"
|
|
except Exception as e:
|
|
assert "Table property 'external.table.purge' cannot be set to true " \
|
|
"with an managed Kudu table." in str(e)
|
|
|
|
# TODO should we block this?
|
|
self.execute_query("""
|
|
CREATE TABLE %s.%s (
|
|
a int PRIMARY KEY)
|
|
PARTITION BY HASH PARTITIONS 8
|
|
STORED AS KUDU
|
|
TBLPROPERTIES ('external.table.purge'='False')""" % (unique_database, table_name))
|
|
result = self.execute_query("SHOW TABLES IN %s" % unique_database)
|
|
assert table_name in result.data
|
|
|
|
def test_sync_tbl_with_kudu_table(self, kudu_client, unique_database):
|
|
"""
|
|
Test tries to create a synchronized table with an existing Kudu table name and
|
|
makes sure it fails.
|
|
"""
|
|
create_query = (
|
|
"""
|
|
CREATE EXTERNAL TABLE {}.{} (
|
|
a int PRIMARY KEY)
|
|
PARTITION BY HASH PARTITIONS 8
|
|
STORED AS KUDU
|
|
TBLPROPERTIES('external.table.purge'='true', 'kudu.table_name' = '{}')"""
|
|
)
|
|
with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
|
|
table_name = self.random_table_name()
|
|
try:
|
|
self.execute_query(
|
|
create_query.format(
|
|
unique_database, table_name, self.get_kudu_table_base_name(kudu_table.name)))
|
|
assert False, ("External tables with external.purge.table property must fail "
|
|
"if the kudu table already exists")
|
|
except Exception as e:
|
|
assert ("Not allowed to set 'kudu.table_name' manually for"
|
|
" synchronized Kudu tables") in str(e)
|
|
|
|
|
|
class TestKuduReadTokenSplit(KuduTestSuite):
|
|
"""
|
|
This suite verifies impala's integration of Kudu's split token API.
|
|
"""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestKuduReadTokenSplit, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
|
def test_kudu_scanner(self, vector):
|
|
"""This runs explain query with variations of mt_dop and
|
|
targeted_kudu_scan_range_length to verify targeted_kudu_scan_range_length's
|
|
functionality.
|
|
Test disabled for EC since the erasure coded files when loaded in kudu
|
|
during data load cause the expected behaviour to change"""
|
|
explain_query = "explain select * from tpch_kudu.lineitem "
|
|
plans = []
|
|
|
|
regular_num_inst = self.__get_num_scanner_instances(
|
|
explain_query, vector, mt_dop=None, targeted_kudu_scan_range_length=None,
|
|
plans=plans)
|
|
|
|
mt_dop_1_num_inst = self.__get_num_scanner_instances(
|
|
explain_query, vector, mt_dop=1, targeted_kudu_scan_range_length=None,
|
|
plans=plans)
|
|
|
|
# targeted_kudu_scan_range_length should be disabled by default and num instances
|
|
# will be equal to the number of partitions
|
|
with_mt_dop_num_inst = self.__get_num_scanner_instances(
|
|
explain_query, vector, mt_dop=10, targeted_kudu_scan_range_length=None,
|
|
plans=plans)
|
|
|
|
# This will result is more splits
|
|
with_mt_dop_and_low_range_len_num_inst = self.__get_num_scanner_instances(
|
|
explain_query, vector, mt_dop=10, targeted_kudu_scan_range_length="8mb",
|
|
plans=plans)
|
|
|
|
assert mt_dop_1_num_inst == regular_num_inst, str(plans)
|
|
assert regular_num_inst < with_mt_dop_num_inst, str(plans)
|
|
assert with_mt_dop_num_inst < with_mt_dop_and_low_range_len_num_inst, str(plans)
|
|
|
|
def __get_num_scanner_instances(self, explain_query, vector, mt_dop,
|
|
targeted_kudu_scan_range_length, plans):
|
|
"""This is a helper method that runs the explain query with the provided query
|
|
options (mt_dop and targeted_kudu_scan_range_length). Appends the generated plan to
|
|
'plans' and returns the num of kudu scanner instances """
|
|
regex = r'F00:PLAN FRAGMENT \[RANDOM\] hosts=3 instances=([0-9]+)'
|
|
# The default read mode of READ_LATEST does not provide high enough consistency for
|
|
# these tests.
|
|
client = self.create_impala_client(protocol=vector.get_value('protocol'))
|
|
client.set_configuration_option("kudu_read_mode", "READ_AT_SNAPSHOT")
|
|
client.set_configuration_option("explain_level", 3)
|
|
if targeted_kudu_scan_range_length:
|
|
client.set_configuration_option(
|
|
"targeted_kudu_scan_range_length", targeted_kudu_scan_range_length)
|
|
if mt_dop:
|
|
client.set_configuration_option("mt_dop", mt_dop)
|
|
result = client.execute(explain_query)
|
|
plan = "\n".join(result.data)
|
|
plans.append(plan)
|
|
matches = re.search(regex, plan)
|
|
assert len(matches.groups()) == 1, plan
|
|
client.clear_configuration()
|
|
return int(matches.group(1))
|
|
|
|
|
|
@SkipIfHive2.create_external_kudu_table
|
|
class TestKuduInsertWithBufferedTupleDesc(KuduTestSuite):
|
|
"""
|
|
This test verifies bug fixing for IMPALA-11029.
|
|
"""
|
|
|
|
# queries to create Kudu tables.
|
|
_create_kudu_table_1_query = "CREATE TABLE {0} (id1 INT NOT NULL, " \
|
|
"agrmt INT NOT NULL, big_id BIGINT NOT NULL, outdated_flag STRING NOT NULL, " \
|
|
"mod_ts TIMESTAMP NOT NULL, PRIMARY KEY (id1, agrmt)) " \
|
|
"PARTITION BY HASH (id1) PARTITIONS 2 STORED AS KUDU"
|
|
|
|
_create_kudu_table_2_query = "CREATE TABLE {0} (cl_id INT NOT NULL, " \
|
|
"cl_agrmt INT NOT NULL, outdat STRING NULL, mod_dat TIMESTAMP NULL, " \
|
|
"PRIMARY KEY (cl_id, cl_agrmt)) " \
|
|
"PARTITION BY HASH (cl_id) PARTITIONS 2 STORED AS KUDU"
|
|
|
|
# query to insert rows to Kudu table.
|
|
_insert_query = "INSERT INTO {0} (id1, agrmt, big_id, outdated_flag, mod_ts) " \
|
|
"SELECT i.cl_id, cast(row_number() over(order by null) as int), i.cl_agrmt, 'Y', " \
|
|
"case when outdat='Y' and i.mod_dat is not null then i.mod_dat else now() end " \
|
|
"from {1} i left join {0} u on u.big_id=i.cl_agrmt " \
|
|
"left join (select id1, big_id from {0} group by id1, big_id) uu " \
|
|
"on uu.big_id=i.cl_agrmt " \
|
|
"where u.big_id is null"
|
|
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_kudu_insert_with_buffered_tuple_desc(self, kudu_client, unique_database):
|
|
# Create Kudu tables.
|
|
table_1_name = "%s.tab1" % unique_database
|
|
self.execute_query(self._create_kudu_table_1_query.format(table_1_name))
|
|
assert kudu_client.table_exists(
|
|
KuduTestSuite.to_kudu_table_name(unique_database, "tab1"))
|
|
table_2_name = "%s.tab2" % unique_database
|
|
self.execute_query(self._create_kudu_table_2_query.format(table_2_name))
|
|
assert kudu_client.table_exists(
|
|
KuduTestSuite.to_kudu_table_name(unique_database, "tab2"))
|
|
|
|
# Insert rows
|
|
try:
|
|
self.execute_query(self._insert_query.format(table_1_name, table_2_name))
|
|
result = self.execute_query("SELECT * FROM %s" % table_1_name)
|
|
assert len(result.data) == 0
|
|
except Exception as e:
|
|
# Not expect to throw exception like "IllegalStateException: null"
|
|
assert False, str(e)
|
|
|
|
|
|
class TestKuduArray(KuduTestSuite):
|
|
"""
|
|
Tests Kudu 1-D array suppport.
|
|
"""
|
|
|
|
def _get_name_from_type(self, data_type):
|
|
return re.split("[(<)]", data_type)[0]
|
|
|
|
def _insert_arrays_into_kudu(self, kudu_table_name):
|
|
exec_path = os.environ["IMPALA_HOME"] + \
|
|
"/be/build/latest/exec/kudu/kudu-array-inserter"
|
|
self.client.log_client(str([exec_path, kudu_table_name]))
|
|
subprocess.check_call([exec_path, kudu_table_name])
|
|
|
|
def _check_table_schema(self, db, table_name, types):
|
|
result = self.execute_query("DESCRIBE {0}.{1}".format(db, table_name))
|
|
assert ("id", "tinyint") == result.tuples()[0][:2]
|
|
for i in range(1, len(result.tuples())):
|
|
(col_name, col_type) = result.tuples()[i][:2]
|
|
assert (col_type == "array<{0}>".format(types[i - 1].lower())
|
|
and col_name == "array_" + self._get_name_from_type(types[i - 1]).lower())
|
|
|
|
# See be/src/exec/kudu/kudu-array-inserter.cc for the test data
|
|
EXPECTED_COLUMNS = {
|
|
"INT": (
|
|
None,
|
|
'[-2147483648,-1,2147483647]',
|
|
'[]',
|
|
'[null,-1,2147483647]',
|
|
'[-2147483648,null,2147483647]',
|
|
'[-2147483648,-1,null]',
|
|
'[-2147483648,-1,2147483647,-2147483648,-1]',
|
|
),
|
|
"TIMESTAMP": (
|
|
None,
|
|
('["1400-01-01 00:00:00",'
|
|
'"1969-12-31 23:59:59.999999000",'
|
|
'"9999-12-31 23:59:59.999999000"]'),
|
|
'[]',
|
|
'[null,"1969-12-31 23:59:59.999999000","9999-12-31 23:59:59.999999000"]',
|
|
'["1400-01-01 00:00:00",null,"9999-12-31 23:59:59.999999000"]',
|
|
'["1400-01-01 00:00:00","1969-12-31 23:59:59.999999000",null]',
|
|
('["1400-01-01 00:00:00",'
|
|
'"1969-12-31 23:59:59.999999000",'
|
|
'"9999-12-31 23:59:59.999999000",'
|
|
'"1400-01-01 00:00:00",'
|
|
'"1969-12-31 23:59:59.999999000"]'),
|
|
),
|
|
# The output of the ARRAY<VARCHAR(1)> data are NOT valid UTF-8 strings.
|
|
"VARCHAR(1)": (
|
|
None,
|
|
b'["\xce","\xcf","\xce"]',
|
|
'[]',
|
|
b'[null,"\xcf","\xce"]',
|
|
b'["\xce",null,"\xce"]',
|
|
b'["\xce","\xcf",null]',
|
|
b'["\xce","\xcf","\xce","\xce","\xcf"]',
|
|
),
|
|
"DECIMAL(18,18)": (
|
|
None,
|
|
'[-0.999999999999999999,-0.000000000000000001,0.999999999999999999]',
|
|
'[]',
|
|
'[null,-0.000000000000000001,0.999999999999999999]',
|
|
'[-0.999999999999999999,null,0.999999999999999999]',
|
|
'[-0.999999999999999999,-0.000000000000000001,null]',
|
|
('[-0.999999999999999999,-0.000000000000000001,0.999999999999999999,'
|
|
'-0.999999999999999999,-0.000000000000000001]'),
|
|
),
|
|
"DOUBLE": (
|
|
None,
|
|
'[-Infinity,NaN,Infinity]',
|
|
'[]',
|
|
'[null,NaN,Infinity]',
|
|
'[-Infinity,null,Infinity]',
|
|
'[-Infinity,NaN,null]',
|
|
'[-Infinity,NaN,Infinity,-Infinity,NaN]',
|
|
),
|
|
# The output of each element in an ARRAY<BINARY> is Base64 encoded.
|
|
"BINARY": (
|
|
None,
|
|
'["zqM=","z4A=","zrs="]',
|
|
'[]',
|
|
'[null,"z4A=","zrs="]',
|
|
'["zqM=",null,"zrs="]',
|
|
'["zqM=","z4A=",null]',
|
|
'["zqM=","z4A=","zrs=","zqM=","z4A="]',
|
|
),
|
|
"BOOLEAN": (
|
|
None,
|
|
'[true,false,true]',
|
|
'[]',
|
|
'[null,false,true]',
|
|
'[true,null,true]',
|
|
'[true,false,null]',
|
|
'[true,false,true,true,false]',
|
|
),
|
|
}
|
|
|
|
def _check_table_data(self, db, table_name, types, query_options):
|
|
columns = ", ".join([
|
|
"array_{0}".format(self._get_name_from_type(item_type))
|
|
for item_type in types
|
|
])
|
|
result = self.execute_query("SELECT id, {0} FROM {1}.{2}".format(
|
|
columns, db, table_name), query_options=query_options)
|
|
for i, result_column in enumerate(zip(*result.tuples())):
|
|
if i == 0:
|
|
assert result_column == tuple(range(len(result.tuples())))
|
|
else:
|
|
assert result_column == self.EXPECTED_COLUMNS[types[i - 1]]
|
|
|
|
def _unnest_expected_column(self, item_type):
|
|
if item_type == "VARCHAR(1)":
|
|
return (
|
|
b'\xce', b'\xcf', b'\xce',
|
|
None, b'\xcf', b'\xce',
|
|
b'\xce', None, b'\xce',
|
|
b'\xce', b'\xcf', None,
|
|
b'\xce', b'\xcf', b'\xce', b'\xce', b'\xcf',
|
|
)
|
|
result = []
|
|
for serialized_array in self.EXPECTED_COLUMNS[item_type]:
|
|
if serialized_array is not None:
|
|
array = json.loads(
|
|
serialized_array,
|
|
parse_float=(lambda s: s) if item_type.startswith("DECIMAL") else None)
|
|
if item_type == "BINARY":
|
|
result += [b64decode(elem) if elem is not None else None for elem in array]
|
|
else:
|
|
result += array
|
|
return tuple(result)
|
|
|
|
EXPECTED_ID_UNNESTED = (
|
|
1, 1, 1,
|
|
3, 3, 3,
|
|
4, 4, 4,
|
|
5, 5, 5,
|
|
6, 6, 6, 6, 6
|
|
)
|
|
|
|
def _check_unnest(self, db, table_name, types, query_options, in_select_list):
|
|
if in_select_list:
|
|
columns = ", ".join([
|
|
"UNNEST(array_{0})".format(self._get_name_from_type(item_type))
|
|
for item_type in types
|
|
])
|
|
result = self.execute_query("SELECT id, {0} FROM {1}.{2}".format(
|
|
columns, db, table_name), query_options=query_options)
|
|
else:
|
|
columns = ", ".join([
|
|
"{0}.array_{1}".format(table_name, self._get_name_from_type(item_type))
|
|
for item_type in types
|
|
])
|
|
result = self.execute_query("SELECT * FROM {1}.{2}, UNNEST({0})".format(
|
|
columns, db, table_name), query_options=query_options)
|
|
for i, result_column in enumerate(zip(*result.tuples())):
|
|
if i == 0:
|
|
assert result_column == self.EXPECTED_ID_UNNESTED
|
|
elif types[i - 1] == "DOUBLE":
|
|
# NaN cannot be compared directly.
|
|
assert str(result_column) == str(self._unnest_expected_column(types[i - 1]))
|
|
else:
|
|
assert result_column == self._unnest_expected_column(types[i - 1])
|
|
|
|
def _check_non_materialzied_elements(self, db, table_name, item_type, options):
|
|
result = self.execute_query("SELECT id FROM {0}.{1} AS t, t.array_{2}".format(
|
|
db, table_name, self._get_name_from_type(item_type)), query_options=options)
|
|
for result_column in zip(*result.tuples()):
|
|
assert result_column == self.EXPECTED_ID_UNNESTED
|
|
|
|
def test_supported_types(self, unique_database, vector):
|
|
"""
|
|
Test array column support for kudu against [unique_database].kudu_array
|
|
and external table [unique_database].kudu_array_external.
|
|
"""
|
|
db = unique_database
|
|
options = vector.get_value('exec_option')
|
|
SUPPORTED_ITEM_TYPES = [
|
|
"INT",
|
|
"TIMESTAMP",
|
|
"VARCHAR(1)",
|
|
"DECIMAL(18,18)",
|
|
"DOUBLE",
|
|
"BINARY",
|
|
"BOOLEAN",
|
|
]
|
|
TEST_TABLE, TEST_EXTERNAL_TABLE = "kudu_array", "kudu_array_external"
|
|
column_defs = ", ".join([
|
|
"array_{0} ARRAY<{1}>".format(self._get_name_from_type(item_type), item_type)
|
|
for item_type in SUPPORTED_ITEM_TYPES
|
|
])
|
|
# The table is unpartitioned to ensure the rows are read in the insertion order.
|
|
create_table_sql = (
|
|
"CREATE TABLE {0}.{1} (id TINYINT PRIMARY KEY, {2}) "
|
|
"STORED AS KUDU"
|
|
)
|
|
|
|
# Create table [unique_database].kudu_array
|
|
self.execute_query(create_table_sql.format(db, TEST_TABLE, column_defs))
|
|
self._check_table_schema(db, TEST_TABLE, SUPPORTED_ITEM_TYPES)
|
|
|
|
# Create external table [unique_database].kudu_array_external pointing to the same
|
|
# table as [unique_database].kudu_array and check the schema as well.
|
|
kudu_table_name = "impala::{0}.{1}".format(db, TEST_TABLE)
|
|
self.execute_query(create_external_kudu_query(
|
|
db, TEST_EXTERNAL_TABLE, kudu_table_name))
|
|
self._check_table_schema(
|
|
db, TEST_EXTERNAL_TABLE, SUPPORTED_ITEM_TYPES)
|
|
|
|
# Insert some rows using kudu-array-inserter and read the data back
|
|
# through both table.
|
|
self._insert_arrays_into_kudu(kudu_table_name)
|
|
self._check_table_data(
|
|
db, TEST_TABLE, SUPPORTED_ITEM_TYPES, options)
|
|
self._check_table_data(
|
|
db, TEST_EXTERNAL_TABLE, SUPPORTED_ITEM_TYPES, options)
|
|
|
|
# Check the result of UNNEST().
|
|
self._check_unnest(
|
|
db, TEST_TABLE, SUPPORTED_ITEM_TYPES, options, in_select_list=True)
|
|
self._check_unnest(
|
|
db, TEST_TABLE, SUPPORTED_ITEM_TYPES, options, in_select_list=False)
|
|
|
|
# Check the result when the array elements are not materialized.
|
|
self._check_non_materialzied_elements(db, TEST_TABLE, "BINARY", options)
|
|
|
|
# TODO(IMPALA-14539): Support duplicate collection slots.
|
|
sql = "SELECT array_{2} FROM {0}.{1}, {1}.array_{2} AS unnested"
|
|
exc = str(self.execute_query_expect_failure(self.client, sql.format(
|
|
db, TEST_TABLE, "INT")))
|
|
assert (
|
|
"Unable to deserialize scan token for node with id '0' for Kudu table "
|
|
"'impala::{0}.{1}': Invalid argument: Duplicate column name: array_{2}"
|
|
).format(db, TEST_TABLE, "int") in exc
|
|
|
|
# TODO(IMPALA-14538): Support referencing a Kudu collection column as a table.
|
|
sql = "SELECT pos, item FROM {0}.{1}.array_{2}"
|
|
exc = str(self.execute_query_expect_failure(self.client, sql.format(
|
|
db, TEST_TABLE, "INT")))
|
|
assert (
|
|
"AnalysisException: "
|
|
"Referencing a Kudu collection column as a table is not supported."
|
|
) in exc
|