# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function import os import pytest import requests import subprocess from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.environ import build_flavor_timeout from tests.common.skip import SkipIfApacheHive from tests.common.test_dimensions import create_exec_option_dimension from tests.util.filesystem_utils import FILESYSTEM_PREFIX from time import sleep class TestExtDataSources(CustomClusterTestSuite): """Impala query tests for external data sources.""" @classmethod def add_test_dimensions(cls): super(TestExtDataSources, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( exec_single_node_option=[100])) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true", catalogd_args="--catalog_topic_mode=minimal") def test_data_source_tables(self, vector, unique_database, unique_name): """Start Impala cluster in LocalCatalog Mode""" self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database, test_file_vars={'$UNIQUE_DATASOURCE': unique_name}) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true", catalogd_args="--catalog_topic_mode=minimal") def test_jdbc_data_source(self, vector, unique_database): """Start Impala cluster in LocalCatalog Mode""" self.run_test_case('QueryTest/jdbc-data-source', vector, use_db=unique_database) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args='--data_source_batch_size=2048') def test_data_source_big_batch_size(self, vector, unique_database, unique_name): """Run test with batch size greater than default size 1024""" self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database, test_file_vars={'$UNIQUE_DATASOURCE': unique_name}) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args='--data_source_batch_size=512') def test_data_source_small_batch_size(self, vector, unique_database, unique_name): """Run test with batch size less than default size 1024""" self.run_test_case('QueryTest/data-source-tables', vector, use_db=unique_database, test_file_vars={'$UNIQUE_DATASOURCE': unique_name}) @SkipIfApacheHive.data_connector_not_supported @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( statestored_args="--statestore_update_frequency_ms=1000") def test_restart_catalogd(self): """Restart Catalog server after creating a data source. Verify that the data source object is persistent across restarting of Catalog server.""" DROP_DATA_SOURCE_QUERY = "DROP DATA SOURCE IF EXISTS test_restart_persistent" CREATE_DATA_SOURCE_QUERY = "CREATE DATA SOURCE test_restart_persistent " \ "LOCATION '/test-warehouse/data-sources/jdbc-data-source.jar' " \ "CLASS 'org.apache.impala.extdatasource.jdbc.PersistentJdbcDataSource' " \ "API_VERSION 'V1'" SHOW_DATA_SOURCE_QUERY = "SHOW DATA SOURCES LIKE 'test_restart_*'" # Create a data source and verify that the object is created successfully. self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY) self.execute_query_expect_success(self.client, CREATE_DATA_SOURCE_QUERY) result = self.execute_query(SHOW_DATA_SOURCE_QUERY) assert result.success, str(result) assert "PersistentJdbcDataSource" in result.get_data() # Restart Catalog server. self.cluster.catalogd.restart() wait_time_s = build_flavor_timeout(90, slow_build_timeout=180) self.cluster.statestored.service.wait_for_metric_value('statestore.live-backends', expected_value=4, timeout=wait_time_s) # Verify that the data source object is still available after restarting Catalog # server. result = self.execute_query(SHOW_DATA_SOURCE_QUERY) assert result.success, str(result) assert "PersistentJdbcDataSource" in result.get_data() # Remove the data source self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY) result = self.execute_query(SHOW_DATA_SOURCE_QUERY) assert result.success, str(result) assert "PersistentJdbcDataSource" not in result.get_data() @SkipIfApacheHive.data_connector_not_supported @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( statestored_args="--use_subscriber_id_as_catalogd_priority=true " "--statestore_heartbeat_frequency_ms=1000", catalogd_args="--catalogd_ha_reset_metadata_on_failover=false " "--enable_reload_events=true", start_args="--enable_catalogd_ha") def test_catalogd_ha_failover(self): """The test case for cluster started with catalogd HA enabled.""" DROP_DATA_SOURCE_QUERY = "DROP DATA SOURCE IF EXISTS test_failover_persistent" CREATE_DATA_SOURCE_QUERY = "CREATE DATA SOURCE test_failover_persistent " \ "LOCATION '/test-warehouse/data-sources/jdbc-data-source.jar' " \ "CLASS 'org.apache.impala.extdatasource.jdbc.FailoverInSyncJdbcDataSource' " \ "API_VERSION 'V1'" SHOW_DATA_SOURCE_QUERY = "SHOW DATA SOURCES LIKE 'test_failover_*'" # Verify two catalogd instances are created with one as active. catalogds = self.cluster.catalogds() assert(len(catalogds) == 2) catalogd_service_1 = catalogds[0].service catalogd_service_2 = catalogds[1].service assert(catalogd_service_1.get_metric_value("catalog-server.active-status")) assert(not catalogd_service_2.get_metric_value("catalog-server.active-status")) # Create a data source and verify that the object is created successfully. self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY) self.execute_query_expect_success(self.client, CREATE_DATA_SOURCE_QUERY) result = self.execute_query(SHOW_DATA_SOURCE_QUERY) assert result.success, str(result) assert "FailoverInSyncJdbcDataSource" in result.get_data() # Kill active catalogd catalogds[0].kill() # Wait for long enough for the statestore to detect the failure of active catalogd # and assign active role to standby catalogd. catalogd_service_2.wait_for_metric_value( "catalog-server.active-status", expected_value=True, timeout=30) assert(catalogd_service_2.get_metric_value("catalog-server.active-status")) # Wait until coordinator receive failover notification. coordinator_service = self.cluster.impalads[0].service expected_catalog_service_port = catalogd_service_2.get_catalog_service_port() received_failover_notification = False retry_count = 30 while (retry_count > 0): active_catalogd_address = \ coordinator_service.get_metric_value("catalog.active-catalogd-address") _, catalog_service_port = active_catalogd_address.split(":") if (int(catalog_service_port) == expected_catalog_service_port): received_failover_notification = True break retry_count -= 1 sleep(1) assert received_failover_notification, \ "Coordinator did not receive notification of Catalog service failover." # Verify that the data source object is available in the catalogd of HA pair. result = self.execute_query(SHOW_DATA_SOURCE_QUERY) assert result.success, str(result) assert "FailoverInSyncJdbcDataSource" in result.get_data() # Remove the data source self.execute_query_expect_success(self.client, DROP_DATA_SOURCE_QUERY) result = self.execute_query(SHOW_DATA_SOURCE_QUERY) assert result.success, str(result) assert "FailoverInSyncJdbcDataSource" not in result.get_data() class TestPostgresJdbcTables(CustomClusterTestSuite): @classmethod def get_workload(cls): return 'functional-query' @classmethod def setup_class(cls): super(TestPostgresJdbcTables, cls).setup_class() @pytest.mark.execute_serially def test_postgres_jdbc_tables(self, vector, unique_database): driver_url = FILESYSTEM_PREFIX +\ "/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar" sql = """ DROP TABLE IF EXISTS {0}.country_postgres; CREATE EXTERNAL TABLE {0}.country_postgres ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP) STORED BY JDBC TBLPROPERTIES ( "database.type"="POSTGRES", "jdbc.url"="jdbc:postgresql://localhost:5432/functional", "jdbc.auth"="AuthMech=0", "jdbc.driver"="org.postgresql.Driver", "driver.url"="{1}", "dbcp.username"="hiveuser", "dbcp.password"="password", "table"="country"); DROP TABLE IF EXISTS {0}.quoted_col; CREATE EXTERNAL TABLE {0}.quoted_col ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, `freeze` STRING, timestamp_col TIMESTAMP ) STORED BY JDBC TBLPROPERTIES ( "database.type"="POSTGRES", "jdbc.url"="jdbc:postgresql://localhost:5432/functional", "jdbc.auth"="AuthMech=0", "jdbc.driver"="org.postgresql.Driver", "driver.url"="{1}", "dbcp.username"="hiveuser", "dbcp.password"="password", "table"="quoted_col" ); DROP TABLE IF EXISTS {0}.country_keystore_postgres; CREATE EXTERNAL TABLE {0}.country_keystore_postgres ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP) STORED BY JDBC TBLPROPERTIES ( "database.type"="POSTGRES", "jdbc.url"="jdbc:postgresql://localhost:5432/functional", "jdbc.auth"="AuthMech=0", "jdbc.driver"="org.postgresql.Driver", "driver.url"="{1}", "dbcp.username"="hiveuser", "dbcp.password"="password", "table"="country"); DROP TABLE IF EXISTS {0}.country_postgres_query; CREATE EXTERNAL TABLE {0}.country_postgres_query ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP) STORED BY JDBC TBLPROPERTIES ( "database.type"="POSTGRES", "jdbc.url"="jdbc:postgresql://localhost:5432/functional", "jdbc.auth"="AuthMech=0", "jdbc.driver"="org.postgresql.Driver", "driver.url"="{1}", "dbcp.username"="hiveuser", "dbcp.password"="password", "query"="select id,name,bool_col,tinyint_col,smallint_col, int_col,bigint_col,float_col,double_col,date_col,string_col, timestamp_col from country"); DROP TABLE IF EXISTS {0}.country_keystore_postgres_query; CREATE EXTERNAL TABLE {0}.country_keystore_postgres_query ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP) STORED BY JDBC TBLPROPERTIES ( "database.type"="POSTGRES", "jdbc.url"="jdbc:postgresql://localhost:5432/functional", "jdbc.auth"="AuthMech=0", "jdbc.driver"="org.postgresql.Driver", "driver.url"="{1}", "dbcp.username"="hiveuser", "dbcp.password"="password", "query"="select id,name,bool_col,tinyint_col,smallint_col, int_col,bigint_col,float_col,double_col,date_col,string_col, timestamp_col from country"); """.format(unique_database, driver_url) ''' try: self.client.execute(sql) except Exception as e: print("\n[DEBUG] Failed to create JDBC table") print("[DEBUG] Exception type:", type(e)) print("[DEBUG] Exception message:", str(e)) print("[DEBUG] Traceback:\n" + "".join(traceback.format_tb(e.__traceback__))) pytest.xfail(reason="Can't create JDBC table.") ''' # Split into statements and execute one-by-one. stmts = [s.strip() for s in sql.split(';')] for i, stmt in enumerate(stmts): if not stmt: continue # Optional: skip pure comment lines (if any) if stmt.startswith('--') or stmt.startswith('/*'): continue # Log the statement (truncate for readability) truncated = (stmt[:200] + '...') if len(stmt) > 200 else stmt print("\n[DEBUG] Executing statement #%d:\n%s\n" % (i + 1, truncated)) try: # Use run_stmt_in_hive as before (this is what the test harness uses). self.client.execute(stmt + ';') except Exception as e: print("\n[DEBUG] Statement #%d failed." % (i + 1)) print("[DEBUG] Exception type:", type(e)) print("[DEBUG] Exception message:", str(e)) raise self.client.execute("INVALIDATE METADATA {0}.country_postgres" .format(unique_database)) self.client.execute("INVALIDATE METADATA {0}.country_keystore_postgres" .format(unique_database)) self.client.execute("INVALIDATE METADATA {0}.country_postgres_query" .format(unique_database)) self.client.execute("INVALIDATE METADATA {0}" ".country_keystore_postgres_query" .format(unique_database)) self.client.execute("DESCRIBE {0}.country_postgres_query" .format(unique_database)) self.client.execute("DESCRIBE {0}" ".country_keystore_postgres_query" .format(unique_database)) self.run_test_case('QueryTest/hive-jdbc-postgres-tables', vector, use_db=unique_database) def test_invalid_postgres_jdbc_table(self, unique_database): sql_both_set = """ CREATE EXTERNAL TABLE {0}.invalid_both_props ( id INT, name STRING ) STORED BY JDBC TBLPROPERTIES ( "database.type"="POSTGRES", "jdbc.url"="jdbc:postgresql://localhost:5432/functional", "jdbc.auth"="AuthMech=0", "jdbc.driver"="org.postgresql.Driver", "driver.url"="/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar", "dbcp.username"="hiveuser", "dbcp.password"="password", "table"="country", "query"="SELECT * FROM country"); """.format(unique_database) with pytest.raises(Exception, match="Only one of 'table' or 'query' should be set"): self.run_stmt_in_hive(sql_both_set) sql_none_set = """ CREATE EXTERNAL TABLE {0}.invalid_no_props ( id INT, name STRING ) STORED BY JDBC TBLPROPERTIES ( "database.type"="POSTGRES", "jdbc.url"="jdbc:postgresql://localhost:5432/functional", "jdbc.auth"="AuthMech=0", "jdbc.driver"="org.postgresql.Driver", "driver.url"="/test-warehouse/data-sources/jdbc-drivers/postgresql-jdbc.jar", "dbcp.username"="hiveuser", "dbcp.password"="password"); """.format(unique_database) with pytest.raises(Exception, match="Either 'table' or 'query' must be set"): self.run_stmt_in_hive(sql_none_set) class TestHivePostgresJdbcTables(CustomClusterTestSuite): """Tests for hive jdbc postgres tables. """ @classmethod def setup_class(cls): super(TestHivePostgresJdbcTables, cls).setup_class() @pytest.mark.execute_serially def test_postgres_hive_jdbc_tables(self, vector, unique_database): """Run tests for external hive jdbc tables.""" hive_sql = """ DROP TABLE IF EXISTS {0}.country_postgres; CREATE EXTERNAL TABLE {0}.country_postgres ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "POSTGRES", "hive.sql.jdbc.driver" = "org.postgresql.Driver", "hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password" = "password", "hive.sql.table" = "country" ); DROP TABLE IF EXISTS {0}.quoted_col; CREATE EXTERNAL TABLE {0}.quoted_col ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, `freeze` STRING, timestamp_col TIMESTAMP ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "POSTGRES", "hive.sql.jdbc.driver" = "org.postgresql.Driver", "hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password" = "password", "hive.sql.table" = "quoted_col" ); DROP TABLE IF EXISTS {0}.country_keystore_postgres; CREATE EXTERNAL TABLE {0}.country_keystore_postgres ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "POSTGRES", "hive.sql.jdbc.driver" = "org.postgresql.Driver", "hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password.keystore" = "jceks://hdfs/test-warehouse/data-sources/test.jceks", "hive.sql.dbcp.password.key" = "hiveuser", "hive.sql.table" = "country" ); DROP TABLE IF EXISTS {0}.country_postgres_query; CREATE EXTERNAL TABLE {0}.country_postgres_query ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "POSTGRES", "hive.sql.jdbc.driver" = "org.postgresql.Driver", "hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password" = "password", "hive.sql.query" = "select id,name,bool_col,tinyint_col,smallint_col, int_col,bigint_col,float_col,double_col,date_col,string_col, timestamp_col from country" ); DROP TABLE IF EXISTS {0}.country_keystore_postgres_query; CREATE EXTERNAL TABLE {0}.country_keystore_postgres_query ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "POSTGRES", "hive.sql.jdbc.driver" = "org.postgresql.Driver", "hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password.keystore" = "jceks://hdfs/test-warehouse/data-sources/test.jceks", "hive.sql.dbcp.password.key" = "hiveuser", "hive.sql.query" = "select id,name,bool_col,tinyint_col,smallint_col, int_col,bigint_col,float_col,double_col,date_col,string_col, timestamp_col from country" ); """.format(unique_database) try: self.run_stmt_in_hive(hive_sql) except Exception: pytest.xfail(reason="Can't create hive jdbc table.") self.client.execute("INVALIDATE METADATA {0}.country_postgres". format(unique_database)) self.client.execute("INVALIDATE METADATA {0}.country_keystore_postgres". format(unique_database)) self.client.execute("INVALIDATE METADATA {0}.country_postgres_query". format(unique_database)) self.client.execute("INVALIDATE METADATA {0}.country_keystore_postgres_query". format(unique_database)) # Describing postgres hive jdbc table in Impala. self.client.execute("DESCRIBE {0}.country_postgres_query".format(unique_database)) self.client.execute("DESCRIBE {0}.country_keystore_postgres_query" .format(unique_database)) # Select statements are verified in hive-jdbc-postgres-tables.test. self.run_test_case('QueryTest/hive-jdbc-postgres-tables', vector, use_db=unique_database) def test_invalid_postgres_hive_jdbc_table(self, unique_database): """Negative tests for hive jdbc tables with postgres""" # Both hive.sql.table and hive.sql.query are set sql_both_set = """ CREATE EXTERNAL TABLE {0}.invalid_both_props ( id INT, name STRING ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "POSTGRES", "hive.sql.jdbc.driver" = "org.postgresql.Driver", "hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password" = "password", "hive.sql.table" = "country", "hive.sql.query" = "SELECT * FROM country" ) """.format(unique_database) with pytest.raises(Exception, match="Only one of 'hive.sql.table' or" " 'hive.sql.query' should be set"): self.run_stmt_in_hive(sql_both_set) # Neither hive.sql.table nor hive.sql.query is set sql_none_set = """ CREATE EXTERNAL TABLE {0}.invalid_no_props ( id INT, name STRING ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "POSTGRES", "hive.sql.jdbc.driver" = "org.postgresql.Driver", "hive.sql.jdbc.url" = "jdbc:postgresql://localhost:5432/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password" = "password" ) """.format(unique_database) with pytest.raises(Exception, match="Either 'hive.sql.table' or" " 'hive.sql.query' must be set"): self.run_stmt_in_hive(sql_none_set) class TestMySqlExtJdbcTables(CustomClusterTestSuite): """Impala query tests for external jdbc tables on MySQL server. It also includes tests for external hive jdbc tables on mysql.""" @classmethod def _setup_mysql_test_env(cls): # Download MySQL docker image and jdbc driver, start MySQL server, create database # and tables, create user account, load testing data, copy jdbc driver to HDFS, etc. script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/setup-mysql-env.sh') run_cmd = [script] try: subprocess.check_call(run_cmd, close_fds=True) except subprocess.CalledProcessError as e: if e.returncode == 10: pytest.skip("These tests required the docker to be added to sudoer's group") elif e.returncode == 20: pytest.skip("Can't connect to local MySQL server") elif e.returncode == 30: pytest.skip("File /var/run/mysqld/mysqld.sock not found") else: # The mysql docker container creation and mysqld can fail due to multiple # reasons. This could be an Intermittent issue and need to re-run the test. pytest.xfail(reason="Failed to setup MySQL testing environment") @classmethod def _remove_mysql_test_env(cls): # Tear down MySQL server, remove its docker image, etc. script = os.path.join(os.environ['IMPALA_HOME'], 'testdata/bin/clean-mysql-env.sh') run_cmd = [script] subprocess.check_call(run_cmd, close_fds=True) @classmethod def setup_class(cls): if cls.exploration_strategy() != 'exhaustive': pytest.skip('These tests only run in exhaustive') cls._setup_mysql_test_env() super(TestMySqlExtJdbcTables, cls).setup_class() @classmethod def teardown_class(cls): cls._remove_mysql_test_env() super(TestMySqlExtJdbcTables, cls).teardown_class() @pytest.mark.execute_serially def test_mysql_ext_jdbc_tables(self, vector, unique_database): """Run tests for external jdbc tables on MySQL""" self.run_test_case('QueryTest/mysql-ext-jdbc-tables', vector, use_db=unique_database) @pytest.mark.execute_serially def test_mysql_hive_jdbc_tables(self, vector, unique_database): """ Run tests for external hive jdbc tables on mysql""" hive_sql = """ ADD JAR hdfs:///test-warehouse/data-sources/jdbc-drivers/mysql-jdbc.jar; DROP TABLE IF EXISTS {0}.country_mysql; CREATE EXTERNAL TABLE {0}.country_mysql ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "MYSQL", "hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver", "hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password" = "password", "hive.sql.table" = "country" ); DROP TABLE IF EXISTS {0}.quoted_col; CREATE EXTERNAL TABLE {0}.quoted_col ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, `freeze` STRING, timestamp_col TIMESTAMP ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "MYSQL", "hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver", "hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password" = "password", "hive.sql.table" = "quoted_col" ); DROP TABLE IF EXISTS {0}.country_keystore_mysql; CREATE EXTERNAL TABLE {0}.country_keystore_mysql ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "MYSQL", "hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver", "hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password.keystore" = "jceks://hdfs/test-warehouse/data-sources/test.jceks", "hive.sql.dbcp.password.key" = "hiveuser", "hive.sql.table" = "country" ); DROP TABLE IF EXISTS {0}.country_mysql_query; CREATE EXTERNAL TABLE {0}.country_mysql_query ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "MYSQL", "hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver", "hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password" = "password", "hive.sql.query" = "select * from country" ); DROP TABLE IF EXISTS {0}.country_keystore_mysql_query; CREATE EXTERNAL TABLE {0}.country_keystore_mysql_query ( id INT, name STRING, bool_col BOOLEAN, tinyint_col SMALLINT, smallint_col SMALLINT, int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, date_col DATE, string_col STRING, timestamp_col TIMESTAMP ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "MYSQL", "hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver", "hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password.keystore" = "jceks://hdfs/test-warehouse/data-sources/test.jceks", "hive.sql.dbcp.password.key" = "hiveuser", "hive.sql.query" = "select * from country" ); """.format(unique_database) try: self.run_stmt_in_hive(hive_sql) except Exception: pytest.xfail(reason="Can't create hive jdbc table.") self.client.execute("INVALIDATE METADATA {0}.country_mysql" .format(unique_database)) self.client.execute("INVALIDATE METADATA {0}.country_keystore_mysql" .format(unique_database)) self.client.execute("INVALIDATE METADATA {0}.country_mysql_query" .format(unique_database)) self.client.execute("INVALIDATE METADATA {0}.country_keystore_mysql_query" .format(unique_database)) # Describing mysql hive jdbc table in Impala. self.client.execute("DESCRIBE {0}.country_mysql".format(unique_database)) self.client.execute("DESCRIBE {0}.country_keystore_mysql".format(unique_database)) self.client.execute("DESCRIBE {0}.country_mysql_query".format(unique_database)) self.client.execute("DESCRIBE {0}.country_keystore_mysql_query" .format(unique_database)) # Select statements are verified in hive-jdbc-mysql-tables.test. self.run_test_case('QueryTest/hive-jdbc-mysql-tables', vector, use_db=unique_database) @pytest.mark.execute_serially def test_invalid_mysql_hive_jdbc_table_properties(self, unique_database): """Negative tests for hive jdbc tables with hive""" add_jar_stmt =\ "ADD JAR hdfs:///test-warehouse/data-sources/jdbc-drivers/mysql-jdbc.jar;" self.run_stmt_in_hive(add_jar_stmt) # Both hive.sql.table and hive.sql.query are set sql_both_set = """ CREATE EXTERNAL TABLE {0}.invalid_both_props_mysql ( id INT, name STRING ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "MYSQL", "hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver", "hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password" = "password", "hive.sql.table" = "country", "hive.sql.query" = "SELECT id, name FROM country" ) """.format(unique_database) with pytest.raises(Exception, match="Only one of 'hive.sql.table' or" " 'hive.sql.query' should be set"): self.run_stmt_in_hive(sql_both_set) # Neither hive.sql.table nor hive.sql.query is set sql_none_set = """ CREATE EXTERNAL TABLE {0}.invalid_no_props_mysql ( id INT, name STRING ) STORED BY 'org.apache.hive.storage.jdbc.JdbcStorageHandler' TBLPROPERTIES ( "hive.sql.database.type" = "MYSQL", "hive.sql.jdbc.driver" = "com.mysql.cj.jdbc.Driver", "hive.sql.jdbc.url" = "jdbc:mysql://localhost:3306/functional", "hive.sql.dbcp.username" = "hiveuser", "hive.sql.dbcp.password" = "password" ) """.format(unique_database) with pytest.raises(Exception, match="Either 'hive.sql.table' or" " 'hive.sql.query' must be set"): self.run_stmt_in_hive(sql_none_set) class TestImpalaExtJdbcTables(CustomClusterTestSuite): """Impala query tests for external jdbc tables in Impala cluster.""" @classmethod def add_test_dimensions(cls): super(TestImpalaExtJdbcTables, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( exec_single_node_option=[100])) @classmethod def _download_impala_jdbc_driver(cls): # Download Impala jdbc driver and copy jdbc driver to HDFS. script = os.path.join( os.environ['IMPALA_HOME'], 'testdata/bin/download-impala-jdbc-driver.sh') run_cmd = [script] try: subprocess.check_call(run_cmd, close_fds=True) except subprocess.CalledProcessError: assert False, "Failed to download Impala JDBC driver" @classmethod def setup_class(cls): cls._download_impala_jdbc_driver() super(TestImpalaExtJdbcTables, cls).setup_class() @classmethod def teardown_class(cls): super(TestImpalaExtJdbcTables, cls).teardown_class() @pytest.mark.execute_serially def test_impala_ext_jdbc_tables(self, vector, unique_database): """Run tests for external jdbc tables in Impala cluster""" self.run_test_case( 'QueryTest/impala-ext-jdbc-tables', vector, use_db=unique_database) # Verify the settings of query options with Queries Web page on Impala coordinator response = requests.get("http://localhost:25000/queries?json") response_json = response.text assert "SET MAX_ERRORS=10000" in response_json, \ "No matching option MAX_ERRORS found in the queries site." assert "SET MEM_LIMIT=1000000000" in response_json, \ "No matching option MEM_LIMIT found in the queries site." assert "SET ENABLED_RUNTIME_FILTER_TYPES=\\\"BLOOM,MIN_MAX\\\"" in response_json or \ "SET ENABLED_RUNTIME_FILTER_TYPES='BLOOM,MIN_MAX'" in response_json, \ "No matching option ENABLED_RUNTIME_FILTER_TYPES found in the queries site." assert "SET QUERY_TIMEOUT_S=600" in response_json, \ "No matching option QUERY_TIMEOUT_S found in the queries site." assert "SET REQUEST_POOL=\\\"default-pool\\\"" in response_json, \ "No matching option REQUEST_POOL found in the queries site." assert "SET DEBUG_ACTION" not in response_json, \ "Matching option DEBUG_ACTION found in the queries site." @pytest.mark.execute_serially def test_impala_ext_jdbc_tables_predicates(self, vector, unique_database): """Run tests for external jdbc tables in Impala cluster for new predicates""" self.run_test_case( 'QueryTest/impala-ext-jdbc-tables-predicates', vector, use_db=unique_database)