Files
impala/tests/custom_cluster/test_ext_data_sources.py
stiga-huang aa4f19219c IMPALA-14546: Fix jdbc driver urls in test_postgres_jdbc_tables
TestPostgresJdbcTables.test_postgres_jdbc_tables uses hardcoded paths
for JDBC driver URLs:
"/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar".
This doesn't work correctly when running on Ozone where we need the
prefix of "ofs://localhost:9862/impala".

This patch fixes the issue by constructing the driver URL with
FILESYSTEM_PREFIX which is "ofs://localhost:9862/impala" on Ozone.
See more in bin/impala-config.sh about how it's set for different
filesystems.

Tests:
 - Ran the test on Ozone.

Change-Id: Ie0c4368b3262d4dcb9e1c05475506411be2e2ef5
Reviewed-on: http://gerrit.cloudera.org:8080/23787
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Reviewed-by: Riza Suminto <riza.suminto@cloudera.com>
2025-12-15 20:09:33 +00:00

944 lines
36 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import os
import pytest
import requests
import subprocess
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.environ import build_flavor_timeout
from tests.common.skip import SkipIfApacheHive
from tests.common.test_dimensions import create_exec_option_dimension
from tests.util.filesystem_utils import FILESYSTEM_PREFIX
from time import sleep
class TestExtDataSources(CustomClusterTestSuite):
"""Impala query tests for external data sources."""
@classmethod
def add_test_dimensions(cls):
super(TestExtDataSources, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
exec_single_node_option=[100]))
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal")
def test_data_source_tables(self, vector, unique_database, unique_name):
"""Start Impala cluster in LocalCatalog Mode"""
self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database,
test_file_vars={'$UNIQUE_DATASOURCE': unique_name})
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal")
def test_jdbc_data_source(self, vector, unique_database):
"""Start Impala cluster in LocalCatalog Mode"""
self.run_test_case('QueryTest/jdbc-data-source', vector, use_db=unique_database)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args='--data_source_batch_size=2048')
def test_data_source_big_batch_size(self, vector, unique_database, unique_name):
"""Run test with batch size greater than default size 1024"""
self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database,
test_file_vars={'$UNIQUE_DATASOURCE': unique_name})
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args='--data_source_batch_size=512')
def test_data_source_small_batch_size(self, vector, unique_database, unique_name):
"""Run test with batch size less than default size 1024"""
self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database,
test_file_vars={'$UNIQUE_DATASOURCE': unique_name})
@SkipIfApacheHive.data_connector_not_supported
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
statestored_args="--statestore_update_frequency_ms=1000")
def test_restart_catalogd(self):
"""Restart Catalog server after creating a data source. Verify that the data source
object is persistent across restarting of Catalog server."""
DROP_DATA_SOURCE_QUERY = "DROP DATA SOURCE IF EXISTS test_restart_persistent"
CREATE_DATA_SOURCE_QUERY = "CREATE DATA SOURCE test_restart_persistent " \
"LOCATION '/test-warehouse/data-sources/jdbc-data-source.jar' " \
"CLASS 'org.apache.impala.extdatasource.jdbc.PersistentJdbcDataSource' " \
"API_VERSION 'V1'"
SHOW_DATA_SOURCE_QUERY = "SHOW DATA SOURCES LIKE 'test_restart_*'"
# Create a data source and verify that the object is created successfully.
self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
self.execute_query_expect_success(self.client, CREATE_DATA_SOURCE_QUERY)
result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
assert result.success, str(result)
assert "PersistentJdbcDataSource" in result.get_data()
# Restart Catalog server.
self.cluster.catalogd.restart()
wait_time_s = build_flavor_timeout(90, slow_build_timeout=180)
self.cluster.statestored.service.wait_for_metric_value('statestore.live-backends',
expected_value=4, timeout=wait_time_s)
# Verify that the data source object is still available after restarting Catalog
# server.
result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
assert result.success, str(result)
assert "PersistentJdbcDataSource" in result.get_data()
# Remove the data source
self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
assert result.success, str(result)
assert "PersistentJdbcDataSource" not in result.get_data()
@SkipIfApacheHive.data_connector_not_supported
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
statestored_args="--use_subscriber_id_as_catalogd_priority=true "
"--statestore_heartbeat_frequency_ms=1000",
catalogd_args="--catalogd_ha_reset_metadata_on_failover=false "
"--enable_reload_events=true",
start_args="--enable_catalogd_ha")
def test_catalogd_ha_failover(self):
"""The test case for cluster started with catalogd HA enabled."""
DROP_DATA_SOURCE_QUERY = "DROP DATA SOURCE IF EXISTS test_failover_persistent"
CREATE_DATA_SOURCE_QUERY = "CREATE DATA SOURCE test_failover_persistent " \
"LOCATION '/test-warehouse/data-sources/jdbc-data-source.jar' " \
"CLASS 'org.apache.impala.extdatasource.jdbc.FailoverInSyncJdbcDataSource' " \
"API_VERSION 'V1'"
SHOW_DATA_SOURCE_QUERY = "SHOW DATA SOURCES LIKE 'test_failover_*'"
# Verify two catalogd instances are created with one as active.
catalogds = self.cluster.catalogds()
assert(len(catalogds) == 2)
catalogd_service_1 = catalogds[0].service
catalogd_service_2 = catalogds[1].service
assert(catalogd_service_1.get_metric_value("catalog-server.active-status"))
assert(not catalogd_service_2.get_metric_value("catalog-server.active-status"))
# Create a data source and verify that the object is created successfully.
self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
self.execute_query_expect_success(self.client, CREATE_DATA_SOURCE_QUERY)
result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
assert result.success, str(result)
assert "FailoverInSyncJdbcDataSource" in result.get_data()
# Kill active catalogd
catalogds[0].kill()
# Wait for long enough for the statestore to detect the failure of active catalogd
# and assign active role to standby catalogd.
catalogd_service_2.wait_for_metric_value(
"catalog-server.active-status", expected_value=True, timeout=30)
assert(catalogd_service_2.get_metric_value("catalog-server.active-status"))
# Wait until coordinator receive failover notification.
coordinator_service = self.cluster.impalads[0].service
expected_catalog_service_port = catalogd_service_2.get_catalog_service_port()
received_failover_notification = False
retry_count = 30
while (retry_count > 0):
active_catalogd_address = \
coordinator_service.get_metric_value("catalog.active-catalogd-address")
_, catalog_service_port = active_catalogd_address.split(":")
if (int(catalog_service_port) == expected_catalog_service_port):
received_failover_notification = True
break
retry_count -= 1
sleep(1)
assert received_failover_notification, \
"Coordinator did not receive notification of Catalog service failover."
# Verify that the data source object is available in the catalogd of HA pair.
result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
assert result.success, str(result)
assert "FailoverInSyncJdbcDataSource" in result.get_data()
# Remove the data source
self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY)
result = self.execute_query(SHOW_DATA_SOURCE_QUERY)
assert result.success, str(result)
assert "FailoverInSyncJdbcDataSource" not in result.get_data()
class TestPostgresJdbcTables(CustomClusterTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def setup_class(cls):
super(TestPostgresJdbcTables, cls).setup_class()
@pytest.mark.execute_serially
def test_postgres_jdbc_tables(self, vector, unique_database):
driver_url = FILESYSTEM_PREFIX +\
"/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar"
sql = """
DROP TABLE IF EXISTS {0}.country_postgres;
CREATE EXTERNAL TABLE {0}.country_postgres (
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP)
STORED BY JDBC
TBLPROPERTIES (
"database.type"="POSTGRES",
"jdbc.url"="jdbc:postgresql://localhost:5432/functional",
"jdbc.auth"="AuthMech=0",
"jdbc.driver"="org.postgresql.Driver",
"driver.url"="{1}",
"dbcp.username"="hiveuser",
"dbcp.password"="password",
"table"="country");
DROP TABLE IF EXISTS {0}.quoted_col;
CREATE EXTERNAL TABLE {0}.quoted_col
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
`freeze` STRING,
timestamp_col TIMESTAMP
)
STORED BY JDBC
TBLPROPERTIES (
"database.type"="POSTGRES",
"jdbc.url"="jdbc:postgresql://localhost:5432/functional",
"jdbc.auth"="AuthMech=0",
"jdbc.driver"="org.postgresql.Driver",
"driver.url"="{1}",
"dbcp.username"="hiveuser",
"dbcp.password"="password",
"table"="quoted_col"
);
DROP TABLE IF EXISTS {0}.country_keystore_postgres;
CREATE EXTERNAL TABLE {0}.country_keystore_postgres (
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP)
STORED BY JDBC
TBLPROPERTIES (
"database.type"="POSTGRES",
"jdbc.url"="jdbc:postgresql://localhost:5432/functional",
"jdbc.auth"="AuthMech=0",
"jdbc.driver"="org.postgresql.Driver",
"driver.url"="{1}",
"dbcp.username"="hiveuser",
"dbcp.password"="password",
"table"="country");
DROP TABLE IF EXISTS {0}.country_postgres_query;
CREATE EXTERNAL TABLE {0}.country_postgres_query (
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP)
STORED BY JDBC
TBLPROPERTIES (
"database.type"="POSTGRES",
"jdbc.url"="jdbc:postgresql://localhost:5432/functional",
"jdbc.auth"="AuthMech=0",
"jdbc.driver"="org.postgresql.Driver",
"driver.url"="{1}",
"dbcp.username"="hiveuser",
"dbcp.password"="password",
"query"="select id,name,bool_col,tinyint_col,smallint_col,
int_col,bigint_col,float_col,double_col,date_col,string_col,
timestamp_col from country");
DROP TABLE IF EXISTS {0}.country_keystore_postgres_query;
CREATE EXTERNAL TABLE {0}.country_keystore_postgres_query (
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP)
STORED BY JDBC
TBLPROPERTIES (
"database.type"="POSTGRES",
"jdbc.url"="jdbc:postgresql://localhost:5432/functional",
"jdbc.auth"="AuthMech=0",
"jdbc.driver"="org.postgresql.Driver",
"driver.url"="{1}",
"dbcp.username"="hiveuser",
"dbcp.password"="password",
"query"="select id,name,bool_col,tinyint_col,smallint_col,
int_col,bigint_col,float_col,double_col,date_col,string_col,
timestamp_col from country");
""".format(unique_database, driver_url)
'''
try:
self.client.execute(sql)
except Exception as e:
print("\n[DEBUG] Failed to create JDBC table")
print("[DEBUG] Exception type:", type(e))
print("[DEBUG] Exception message:", str(e))
print("[DEBUG] Traceback:\n" + "".join(traceback.format_tb(e.__traceback__)))
pytest.xfail(reason="Can't create JDBC table.")
'''
# Split into statements and execute one-by-one.
stmts = [s.strip() for s in sql.split(';')]
for i, stmt in enumerate(stmts):
if not stmt:
continue
# Optional: skip pure comment lines (if any)
if stmt.startswith('--') or stmt.startswith('/*'):
continue
# Log the statement (truncate for readability)
truncated = (stmt[:200] + '...') if len(stmt) > 200 else stmt
print("\n[DEBUG] Executing statement #%d:\n%s\n" % (i + 1, truncated))
try:
# Use run_stmt_in_hive as before (this is what the test harness uses).
self.client.execute(stmt + ';')
except Exception as e:
print("\n[DEBUG] Statement #%d failed." % (i + 1))
print("[DEBUG] Exception type:", type(e))
print("[DEBUG] Exception message:", str(e))
raise
self.client.execute("INVALIDATE METADATA {0}.country_postgres"
.format(unique_database))
self.client.execute("INVALIDATE METADATA {0}.country_keystore_postgres"
.format(unique_database))
self.client.execute("INVALIDATE METADATA {0}.country_postgres_query"
.format(unique_database))
self.client.execute("INVALIDATE METADATA {0}"
".country_keystore_postgres_query"
.format(unique_database))
self.client.execute("DESCRIBE {0}.country_postgres_query"
.format(unique_database))
self.client.execute("DESCRIBE {0}"
".country_keystore_postgres_query"
.format(unique_database))
self.run_test_case('QueryTest/hive-jdbc-postgres-tables',
vector, use_db=unique_database)
def test_invalid_postgres_jdbc_table(self, unique_database):
sql_both_set = """
CREATE EXTERNAL TABLE {0}.invalid_both_props (
id INT,
name STRING
)
STORED BY JDBC
TBLPROPERTIES (
"database.type"="POSTGRES",
"jdbc.url"="jdbc:postgresql://localhost:5432/functional",
"jdbc.auth"="AuthMech=0",
"jdbc.driver"="org.postgresql.Driver",
"driver.url"="/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar",
"dbcp.username"="hiveuser",
"dbcp.password"="password",
"table"="country",
"query"="SELECT * FROM country");
""".format(unique_database)
with pytest.raises(Exception, match="Only one of 'table' or 'query' should be set"):
self.run_stmt_in_hive(sql_both_set)
sql_none_set = """
CREATE EXTERNAL TABLE {0}.invalid_no_props (
id INT,
name STRING
)
STORED BY JDBC
TBLPROPERTIES (
"database.type"="POSTGRES",
"jdbc.url"="jdbc:postgresql://localhost:5432/functional",
"jdbc.auth"="AuthMech=0",
"jdbc.driver"="org.postgresql.Driver",
"driver.url"="/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar",
"dbcp.username"="hiveuser",
"dbcp.password"="password");
""".format(unique_database)
with pytest.raises(Exception, match="Either 'table' or 'query' must be set"):
self.run_stmt_in_hive(sql_none_set)
class TestHivePostgresJdbcTables(CustomClusterTestSuite):
"""Tests for hive jdbc postgres tables. """
@classmethod
def setup_class(cls):
super(TestHivePostgresJdbcTables, cls).setup_class()
@pytest.mark.execute_serially
def test_postgres_hive_jdbc_tables(self, vector, unique_database):
"""Run tests for external hive jdbc tables."""
hive_sql = """
DROP TABLE IF EXISTS {0}.country_postgres;
CREATE EXTERNAL TABLE {0}.country_postgres
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "POSTGRES",
"hive.sql.jdbc.driver" = "org.postgresql.Driver",
"hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password" = "password",
"hive.sql.table" = "country"
);
DROP TABLE IF EXISTS {0}.quoted_col;
CREATE EXTERNAL TABLE {0}.quoted_col
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
`freeze` STRING,
timestamp_col TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "POSTGRES",
"hive.sql.jdbc.driver" = "org.postgresql.Driver",
"hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password" = "password",
"hive.sql.table" = "quoted_col"
);
DROP TABLE IF EXISTS {0}.country_keystore_postgres;
CREATE EXTERNAL TABLE {0}.country_keystore_postgres
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "POSTGRES",
"hive.sql.jdbc.driver" = "org.postgresql.Driver",
"hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password.keystore" =
"jceks://hdfs/test-warehouse/data-sources/test.jceks",
"hive.sql.dbcp.password.key" = "hiveuser",
"hive.sql.table" = "country"
);
DROP TABLE IF EXISTS {0}.country_postgres_query;
CREATE EXTERNAL TABLE {0}.country_postgres_query
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "POSTGRES",
"hive.sql.jdbc.driver" = "org.postgresql.Driver",
"hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password" = "password",
"hive.sql.query" = "select id,name,bool_col,tinyint_col,smallint_col,
int_col,bigint_col,float_col,double_col,date_col,string_col,
timestamp_col from country"
);
DROP TABLE IF EXISTS {0}.country_keystore_postgres_query;
CREATE EXTERNAL TABLE {0}.country_keystore_postgres_query
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "POSTGRES",
"hive.sql.jdbc.driver" = "org.postgresql.Driver",
"hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password.keystore" =
"jceks://hdfs/test-warehouse/data-sources/test.jceks",
"hive.sql.dbcp.password.key" = "hiveuser",
"hive.sql.query" = "select id,name,bool_col,tinyint_col,smallint_col,
int_col,bigint_col,float_col,double_col,date_col,string_col,
timestamp_col from country"
);
""".format(unique_database)
try:
self.run_stmt_in_hive(hive_sql)
except Exception:
pytest.xfail(reason="Can't create hive jdbc table.")
self.client.execute("INVALIDATE METADATA {0}.country_postgres".
format(unique_database))
self.client.execute("INVALIDATE METADATA {0}.country_keystore_postgres".
format(unique_database))
self.client.execute("INVALIDATE METADATA {0}.country_postgres_query".
format(unique_database))
self.client.execute("INVALIDATE METADATA {0}.country_keystore_postgres_query".
format(unique_database))
# Describing postgres hive jdbc table in Impala.
self.client.execute("DESCRIBE {0}.country_postgres_query".format(unique_database))
self.client.execute("DESCRIBE {0}.country_keystore_postgres_query"
.format(unique_database))
# Select statements are verified in hive-jdbc-postgres-tables.test.
self.run_test_case('QueryTest/hive-jdbc-postgres-tables', vector,
use_db=unique_database)
def test_invalid_postgres_hive_jdbc_table(self, unique_database):
"""Negative tests for hive jdbc tables with postgres"""
# Both hive.sql.table and hive.sql.query are set
sql_both_set = """
CREATE EXTERNAL TABLE {0}.invalid_both_props (
id INT,
name STRING
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "POSTGRES",
"hive.sql.jdbc.driver" = "org.postgresql.Driver",
"hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password" = "password",
"hive.sql.table" = "country",
"hive.sql.query" = "SELECT * FROM country"
)
""".format(unique_database)
with pytest.raises(Exception, match="Only one of 'hive.sql.table' or"
" 'hive.sql.query' should be set"): self.run_stmt_in_hive(sql_both_set)
# Neither hive.sql.table nor hive.sql.query is set
sql_none_set = """
CREATE EXTERNAL TABLE {0}.invalid_no_props (
id INT,
name STRING
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "POSTGRES",
"hive.sql.jdbc.driver" = "org.postgresql.Driver",
"hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password" = "password"
)
""".format(unique_database)
with pytest.raises(Exception, match="Either 'hive.sql.table' or"
" 'hive.sql.query' must be set"): self.run_stmt_in_hive(sql_none_set)
class TestMySqlExtJdbcTables(CustomClusterTestSuite):
"""Impala query tests for external jdbc tables on MySQL server.
It also includes tests for external hive jdbc tables on mysql."""
@classmethod
def _setup_mysql_test_env(cls):
# Download MySQL docker image and jdbc driver, start MySQL server, create database
# and tables, create user account, load testing data, copy jdbc driver to HDFS, etc.
script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/setup-mysql-env.sh')
run_cmd = [script]
try:
subprocess.check_call(run_cmd, close_fds=True)
except subprocess.CalledProcessError as e:
if e.returncode == 10:
pytest.skip("These tests required the docker to be added to sudoer's group")
elif e.returncode == 20:
pytest.skip("Can't connect to local MySQL server")
elif e.returncode == 30:
pytest.skip("File /var/run/mysqld/mysqld.sock not found")
else:
# The mysql docker container creation and mysqld can fail due to multiple
# reasons. This could be an Intermittent issue and need to re-run the test.
pytest.xfail(reason="Failed to setup MySQL testing environment")
@classmethod
def _remove_mysql_test_env(cls):
# Tear down MySQL server, remove its docker image, etc.
script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/clean-mysql-env.sh')
run_cmd = [script]
subprocess.check_call(run_cmd, close_fds=True)
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('These tests only run in exhaustive')
cls._setup_mysql_test_env()
super(TestMySqlExtJdbcTables, cls).setup_class()
@classmethod
def teardown_class(cls):
cls._remove_mysql_test_env()
super(TestMySqlExtJdbcTables, cls).teardown_class()
@pytest.mark.execute_serially
def test_mysql_ext_jdbc_tables(self, vector, unique_database):
"""Run tests for external jdbc tables on MySQL"""
self.run_test_case('QueryTest/mysql-ext-jdbc-tables', vector, use_db=unique_database)
@pytest.mark.execute_serially
def test_mysql_hive_jdbc_tables(self, vector, unique_database):
""" Run tests for external hive jdbc tables on mysql"""
hive_sql = """
ADD JAR hdfs:///test-warehouse/data-sources/jdbc-drivers/mysql-jdbc.jar;
DROP TABLE IF EXISTS {0}.country_mysql;
CREATE EXTERNAL TABLE {0}.country_mysql
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "MYSQL",
"hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver",
"hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password" = "password",
"hive.sql.table" = "country"
);
DROP TABLE IF EXISTS {0}.quoted_col;
CREATE EXTERNAL TABLE {0}.quoted_col
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
`freeze` STRING,
timestamp_col TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "MYSQL",
"hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver",
"hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password" = "password",
"hive.sql.table" = "quoted_col"
);
DROP TABLE IF EXISTS {0}.country_keystore_mysql;
CREATE EXTERNAL TABLE {0}.country_keystore_mysql
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "MYSQL",
"hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver",
"hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password.keystore" =
"jceks://hdfs/test-warehouse/data-sources/test.jceks",
"hive.sql.dbcp.password.key" = "hiveuser",
"hive.sql.table" = "country"
);
DROP TABLE IF EXISTS {0}.country_mysql_query;
CREATE EXTERNAL TABLE {0}.country_mysql_query
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "MYSQL",
"hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver",
"hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password" = "password",
"hive.sql.query" = "select * from country"
);
DROP TABLE IF EXISTS {0}.country_keystore_mysql_query;
CREATE EXTERNAL TABLE {0}.country_keystore_mysql_query
(
id INT,
name STRING,
bool_col BOOLEAN,
tinyint_col SMALLINT,
smallint_col SMALLINT,
int_col INT,
bigint_col BIGINT,
float_col FLOAT,
double_col DOUBLE,
date_col DATE,
string_col STRING,
timestamp_col TIMESTAMP
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "MYSQL",
"hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver",
"hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password.keystore" =
"jceks://hdfs/test-warehouse/data-sources/test.jceks",
"hive.sql.dbcp.password.key" = "hiveuser",
"hive.sql.query" = "select * from country"
);
""".format(unique_database)
try:
self.run_stmt_in_hive(hive_sql)
except Exception:
pytest.xfail(reason="Can't create hive jdbc table.")
self.client.execute("INVALIDATE METADATA {0}.country_mysql"
.format(unique_database))
self.client.execute("INVALIDATE METADATA {0}.country_keystore_mysql"
.format(unique_database))
self.client.execute("INVALIDATE METADATA {0}.country_mysql_query"
.format(unique_database))
self.client.execute("INVALIDATE METADATA {0}.country_keystore_mysql_query"
.format(unique_database))
# Describing mysql hive jdbc table in Impala.
self.client.execute("DESCRIBE {0}.country_mysql".format(unique_database))
self.client.execute("DESCRIBE {0}.country_keystore_mysql".format(unique_database))
self.client.execute("DESCRIBE {0}.country_mysql_query".format(unique_database))
self.client.execute("DESCRIBE {0}.country_keystore_mysql_query"
.format(unique_database))
# Select statements are verified in hive-jdbc-mysql-tables.test.
self.run_test_case('QueryTest/hive-jdbc-mysql-tables', vector,
use_db=unique_database)
@pytest.mark.execute_serially
def test_invalid_mysql_hive_jdbc_table_properties(self, unique_database):
"""Negative tests for hive jdbc tables with hive"""
add_jar_stmt =\
"ADD JAR hdfs:///test-warehouse/data-sources/jdbc-drivers/mysql-jdbc.jar;"
self.run_stmt_in_hive(add_jar_stmt)
# Both hive.sql.table and hive.sql.query are set
sql_both_set = """
CREATE EXTERNAL TABLE {0}.invalid_both_props_mysql (
id INT,
name STRING
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "MYSQL",
"hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver",
"hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password" = "password",
"hive.sql.table" = "country",
"hive.sql.query" = "SELECT id, name FROM country"
)
""".format(unique_database)
with pytest.raises(Exception, match="Only one of 'hive.sql.table' or"
" 'hive.sql.query' should be set"): self.run_stmt_in_hive(sql_both_set)
# Neither hive.sql.table nor hive.sql.query is set
sql_none_set = """
CREATE EXTERNAL TABLE {0}.invalid_no_props_mysql (
id INT,
name STRING
)
STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler'
TBLPROPERTIES (
"hive.sql.database.type" = "MYSQL",
"hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver",
"hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional",
"hive.sql.dbcp.username" = "hiveuser",
"hive.sql.dbcp.password" = "password"
)
""".format(unique_database)
with pytest.raises(Exception, match="Either 'hive.sql.table' or"
" 'hive.sql.query' must be set"): self.run_stmt_in_hive(sql_none_set)
class TestImpalaExtJdbcTables(CustomClusterTestSuite):
"""Impala query tests for external jdbc tables in Impala cluster."""
@classmethod
def add_test_dimensions(cls):
super(TestImpalaExtJdbcTables, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
exec_single_node_option=[100]))
@classmethod
def _download_impala_jdbc_driver(cls):
# Download Impala jdbc driver and copy jdbc driver to HDFS.
script = os.path.join(
os.environ['IMPALA_HOME'], 'testdata/bin/download-impala-jdbc-driver.sh')
run_cmd = [script]
try:
subprocess.check_call(run_cmd, close_fds=True)
except subprocess.CalledProcessError:
assert False, "Failed to download Impala JDBC driver"
@classmethod
def setup_class(cls):
cls._download_impala_jdbc_driver()
super(TestImpalaExtJdbcTables, cls).setup_class()
@classmethod
def teardown_class(cls):
super(TestImpalaExtJdbcTables, cls).teardown_class()
@pytest.mark.execute_serially
def test_impala_ext_jdbc_tables(self, vector, unique_database):
"""Run tests for external jdbc tables in Impala cluster"""
self.run_test_case(
'QueryTest/impala-ext-jdbc-tables', vector, use_db=unique_database)
# Verify the settings of query options with Queries Web page on Impala coordinator
response = requests.get("http://localhost:25000/queries?json")
response_json = response.text
assert "SET MAX_ERRORS=10000" in response_json, \
"No matching option MAX_ERRORS found in the queries site."
assert "SET MEM_LIMIT=1000000000" in response_json, \
"No matching option MEM_LIMIT found in the queries site."
assert "SET ENABLED_RUNTIME_FILTER_TYPES=\\\"BLOOM,MIN_MAX\\\"" in response_json or \
"SET ENABLED_RUNTIME_FILTER_TYPES='BLOOM,MIN_MAX'" in response_json, \
"No matching option ENABLED_RUNTIME_FILTER_TYPES found in the queries site."
assert "SET QUERY_TIMEOUT_S=600" in response_json, \
"No matching option QUERY_TIMEOUT_S found in the queries site."
assert "SET REQUEST_POOL=\\\"default-pool\\\"" in response_json, \
"No matching option REQUEST_POOL found in the queries site."
assert "SET DEBUG_ACTION" not in response_json, \
"Matching option DEBUG_ACTION found in the queries site."
@pytest.mark.execute_serially
def test_impala_ext_jdbc_tables_predicates(self, vector, unique_database):
"""Run tests for external jdbc tables in Impala cluster for new predicates"""
self.run_test_case(
'QueryTest/impala-ext-jdbc-tables-predicates', vector, use_db=unique_database)